You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:06 UTC
[10/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
new file mode 100644
index 0000000..5bf75a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractFlowFileQueue implements FlowFileQueue {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
+ private final String identifier;
+ private final FlowFileRepository flowFileRepository;
+ private final ProvenanceEventRepository provRepository;
+ private final ResourceClaimManager resourceClaimManager;
+ private final ProcessScheduler scheduler;
+
+ private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
+ private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
+
+ private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
+
+ private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
+ private String partitioningAttribute = null;
+
+ private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
+
+
+ public AbstractFlowFileQueue(final String identifier, final ProcessScheduler scheduler,
+ final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ResourceClaimManager resourceClaimManager) {
+ this.identifier = identifier;
+ this.scheduler = scheduler;
+ this.flowFileRepository = flowFileRepo;
+ this.provRepository = provRepo;
+ this.resourceClaimManager = resourceClaimManager;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ protected ProcessScheduler getScheduler() {
+ return scheduler;
+ }
+
+ @Override
+ public String getFlowFileExpiration() {
+ return expirationPeriod.get().getPeriod();
+ }
+
+ @Override
+ public int getFlowFileExpiration(final TimeUnit timeUnit) {
+ return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setFlowFileExpiration(final String flowExpirationPeriod) {
+ final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
+ if (millis < 0) {
+ throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
+ }
+
+ expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
+ }
+
+ @Override
+ public void setBackPressureObjectThreshold(final long threshold) {
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = getMaxQueueSize();
+ final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+ }
+ }
+
+ @Override
+ public long getBackPressureObjectThreshold() {
+ return getMaxQueueSize().getMaxCount();
+ }
+
+ @Override
+ public void setBackPressureDataSizeThreshold(final String maxDataSize) {
+ final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+
+ boolean updated = false;
+ while (!updated) {
+ MaxQueueSize maxSize = getMaxQueueSize();
+ final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
+ updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+ }
+ }
+
+ @Override
+ public String getBackPressureDataSizeThreshold() {
+ return getMaxQueueSize().getMaxSize();
+ }
+
+ private MaxQueueSize getMaxQueueSize() {
+ return maxQueueSize.get();
+ }
+
+ @Override
+ public boolean isFull() {
+ final MaxQueueSize maxSize = getMaxQueueSize();
+
+ // Check if max size is set
+ if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
+ return false;
+ }
+
+ final QueueSize queueSize = size();
+ if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
+ return true;
+ }
+
+ if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
+ return true;
+ }
+
+ return false;
+ }
+
+
+ @Override
+ public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
+ // purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother
+ if (listRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
+ final ListFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ listRequestMap.remove(requestId);
+ }
+ }
+
+ // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
+ final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size());
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int position = 0;
+ int resultCount = 0;
+ final List<FlowFileSummary> summaries = new ArrayList<>();
+
+ // Create an ArrayList that contains all of the contents of the active queue.
+ // We do this so that we don't have to hold the lock any longer than absolutely necessary.
+ // We cannot simply pull the first 'maxResults' records from the queue, however, because the
+ // Iterator provided by PriorityQueue does not return records in order. So we would have to either
+ // use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and
+ // do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do
+ // the sorting to put the records back. So even though this has an expensive of Java Heap to create the
+ // extra collection, we are making this trade-off to avoid locking the queue any longer than required.
+ final List<FlowFileRecord> allFlowFiles = getListableFlowFiles();
+ final QueuePrioritizer prioritizer = new QueuePrioritizer(getPriorities());
+
+ listRequest.setState(ListFlowFileState.CALCULATING_LIST);
+
+ // sort the FlowFileRecords so that we have the list in the same order as on the queue.
+ allFlowFiles.sort(prioritizer);
+
+ for (final FlowFileRecord flowFile : allFlowFiles) {
+ summaries.add(summarize(flowFile, ++position));
+ if (summaries.size() >= maxResults) {
+ break;
+ }
+ }
+
+ logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", this, resultCount);
+ listRequest.setFlowFileSummaries(summaries);
+ listRequest.setState(ListFlowFileState.COMPLETE);
+ }
+ }, "List FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ listRequestMap.put(requestIdentifier, listRequest);
+ return listRequest;
+ }
+
+ @Override
+ public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) {
+ return listRequestMap.get(requestIdentifier);
+ }
+
+ @Override
+ public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) {
+ logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier);
+ final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier);
+ if (request != null) {
+ request.cancel();
+ }
+
+ return request;
+ }
+
+ /**
+ * @return all FlowFiles that should be listed in response to a List Queue request
+ */
+ protected abstract List<FlowFileRecord> getListableFlowFiles();
+
+
+ @Override
+ public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
+ logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
+
+ // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
+ if (dropRequestMap.size() > 10) {
+ final List<String> toDrop = new ArrayList<>();
+ for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
+ final DropFlowFileRequest request = entry.getValue();
+ final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+ if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+ toDrop.add(entry.getKey());
+ }
+ }
+
+ for (final String requestId : toDrop) {
+ dropRequestMap.remove(requestId);
+ }
+ }
+
+ final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
+ final QueueSize originalSize = size();
+ dropRequest.setCurrentSize(originalSize);
+ dropRequest.setOriginalSize(originalSize);
+ if (originalSize.getObjectCount() == 0) {
+ dropRequest.setDroppedSize(originalSize);
+ dropRequest.setState(DropFlowFileState.COMPLETE);
+ dropRequestMap.put(requestIdentifier, dropRequest);
+ return dropRequest;
+ }
+
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ dropFlowFiles(dropRequest, requestor);
+ }
+ }, "Drop FlowFiles for Connection " + getIdentifier());
+ t.setDaemon(true);
+ t.start();
+
+ dropRequestMap.put(requestIdentifier, dropRequest);
+
+ return dropRequest;
+ }
+
+
+ @Override
+ public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) {
+ final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
+ if (request == null) {
+ return null;
+ }
+
+ request.cancel();
+ return request;
+ }
+
+ @Override
+ public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
+ return dropRequestMap.get(requestIdentifier);
+ }
+
+ /**
+ * Synchronously drops all FlowFiles in the queue
+ *
+ * @param dropRequest the request
+ * @param requestor the identity of the user/agent who made the request
+ */
+ protected abstract void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor);
+
+ @Override
+ public void verifyCanList() throws IllegalStateException {
+ }
+
+
+ protected FlowFileSummary summarize(final FlowFile flowFile, final int position) {
+ // extract all of the information that we care about into new variables rather than just
+ // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
+ // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
+ // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well,
+ // which can be problematic if we expect them to be swapped out.
+ final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+ final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final long size = flowFile.getSize();
+ final Long lastQueuedTime = flowFile.getLastQueueDate();
+ final long lineageStart = flowFile.getLineageStartDate();
+ final boolean penalized = flowFile.isPenalized();
+
+ return new FlowFileSummary() {
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public String getFilename() {
+ return filename;
+ }
+
+ @Override
+ public int getPosition() {
+ return position;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public long getLastQueuedTime() {
+ return lastQueuedTime == null ? 0L : lastQueuedTime;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return lineageStart;
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return penalized;
+ }
+ };
+ }
+
+ protected QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
+ // Create a Provenance Event and a FlowFile Repository record for each FlowFile
+ final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
+ final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
+ for (final FlowFileRecord flowFile : flowFiles) {
+ provenanceEvents.add(createDropProvenanceEvent(flowFile, requestor));
+ flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
+ }
+
+ long dropContentSize = 0L;
+ for (final FlowFileRecord flowFile : flowFiles) {
+ dropContentSize += flowFile.getSize();
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim == null) {
+ continue;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ if (resourceClaim == null) {
+ continue;
+ }
+
+ resourceClaimManager.decrementClaimantCount(resourceClaim);
+ }
+
+ provRepository.registerEvents(provenanceEvents);
+ flowFileRepository.updateRepository(flowFileRepoRecords);
+ return new QueueSize(flowFiles.size(), dropContentSize);
+ }
+
+ private ProvenanceEventRecord createDropProvenanceEvent(final FlowFileRecord flowFile, final String requestor) {
+ final ProvenanceEventBuilder builder = provRepository.eventBuilder();
+ builder.fromFlowFile(flowFile);
+ builder.setEventType(ProvenanceEventType.DROP);
+ builder.setLineageStartDate(flowFile.getLineageStartDate());
+ builder.setComponentId(getIdentifier());
+ builder.setComponentType("Connection");
+ builder.setAttributes(flowFile.getAttributes(), Collections.emptyMap());
+ builder.setDetails("FlowFile Queue emptied by " + requestor);
+ builder.setSourceQueueIdentifier(getIdentifier());
+
+ final ContentClaim contentClaim = flowFile.getContentClaim();
+ if (contentClaim != null) {
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
+ }
+
+ return builder.build();
+ }
+
+ private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
+ return new DropFlowFileRepositoryRecord(this, flowFile);
+ }
+
+ @Override
+ public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
+ if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
+ throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
+ }
+
+ this.loadBalanceStrategy = strategy;
+ this.partitioningAttribute = partitioningAttribute;
+ }
+
+ @Override
+ public synchronized String getPartitioningAttribute() {
+ return partitioningAttribute;
+ }
+
+ @Override
+ public synchronized LoadBalanceStrategy getLoadBalanceStrategy() {
+ return loadBalanceStrategy;
+ }
+
+ @Override
+ public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
+ this.compression = compression;
+ }
+
+ @Override
+ public synchronized LoadBalanceCompression getLoadBalanceCompression() {
+ return compression;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
new file mode 100644
index 0000000..9a220ae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.events.EventReporter;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class BlockingSwappablePriorityQueue extends SwappablePriorityQueue {
+ private final Object monitor = new Object();
+
+ public BlockingSwappablePriorityQueue(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, final FlowFileQueue flowFileQueue,
+ final DropFlowFileAction dropAction, final String partitionName) {
+
+ super(swapManager, swapThreshold, eventReporter, flowFileQueue, dropAction, partitionName);
+ }
+
+ @Override
+ public void put(final FlowFileRecord flowFile) {
+ super.put(flowFile);
+
+ synchronized (monitor) {
+ monitor.notify();
+ }
+ }
+
+ @Override
+ public void putAll(final Collection<FlowFileRecord> flowFiles) {
+ super.putAll(flowFiles);
+
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException {
+ final long maxTimestamp = System.currentTimeMillis() + waitMillis;
+
+ synchronized (monitor) {
+ FlowFileRecord flowFile = null;
+ do {
+ flowFile = super.poll(expiredRecords, expirationMillis);
+ if (flowFile != null) {
+ return flowFile;
+ }
+
+ monitor.wait(waitMillis);
+ } while (System.currentTimeMillis() < maxTimestamp);
+
+ return null;
+ }
+ }
+
+ @Override
+ public void inheritQueueContents(final FlowFileQueueContents queueContents) {
+ // We have to override this method and synchronize on monitor before calling super.inheritQueueContents.
+ // If we don't do this, then our super class will obtain the write lock and call putAll, which will cause
+ // us to synchronize on monitor AFTER obtaining the write lock (WriteLock then monitor).
+ // If poll() is then called, we will synchronize on monitor, THEN attempt to obtain the write lock (monitor then WriteLock),
+ // which would cause a deadlock.
+ synchronized (monitor) {
+ super.inheritQueueContents(queueContents);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java
new file mode 100644
index 0000000..a3ae6ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public interface ConnectionEventListener {
+ void triggerSourceEvent();
+
+ void triggerDestinationEvent();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java
new file mode 100644
index 0000000..86cd169
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public interface DropFlowFileAction {
+ QueueSize drop(List<FlowFileRecord> flowFiles, String requestor) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
new file mode 100644
index 0000000..f47b4eb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+
+public class DropFlowFileRepositoryRecord implements RepositoryRecord {
+ private final FlowFileQueue queue;
+ private final FlowFileRecord flowFile;
+
+ public DropFlowFileRepositoryRecord(final FlowFileQueue queue, final FlowFileRecord flowFile) {
+ this.queue = queue;
+ this.flowFile = flowFile;
+ }
+
+ @Override
+ public FlowFileQueue getDestination() {
+ return null;
+ }
+
+ @Override
+ public FlowFileQueue getOriginalQueue() {
+ return queue;
+ }
+
+ @Override
+ public RepositoryRecordType getType() {
+ return RepositoryRecordType.DELETE;
+ }
+
+ @Override
+ public ContentClaim getCurrentClaim() {
+ return flowFile.getContentClaim();
+ }
+
+ @Override
+ public ContentClaim getOriginalClaim() {
+ return flowFile.getContentClaim();
+ }
+
+ @Override
+ public long getCurrentClaimOffset() {
+ return flowFile.getContentClaimOffset();
+ }
+
+ @Override
+ public FlowFileRecord getCurrent() {
+ return flowFile;
+ }
+
+ @Override
+ public boolean isAttributesChanged() {
+ return false;
+ }
+
+ @Override
+ public boolean isMarkedForAbort() {
+ return false;
+ }
+
+ @Override
+ public String getSwapLocation() {
+ return null;
+ }
+
+ @Override
+ public List<ContentClaim> getTransientClaims() {
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
new file mode 100644
index 0000000..60ad64d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.List;
+
+public class FlowFileQueueContents {
+ private final List<String> swapLocations;
+ private final List<FlowFileRecord> activeFlowFiles;
+ private final QueueSize swapSize;
+
+ public FlowFileQueueContents(final List<FlowFileRecord> activeFlowFiles, final List<String> swapLocations, final QueueSize swapSize) {
+ this.activeFlowFiles = activeFlowFiles;
+ this.swapLocations = swapLocations;
+ this.swapSize = swapSize;
+ }
+
+ public List<FlowFileRecord> getActiveFlowFiles() {
+ return activeFlowFiles;
+ }
+
+ public List<String> getSwapLocations() {
+ return swapLocations;
+ }
+
+ public QueueSize getSwapSize() {
+ return swapSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java
new file mode 100644
index 0000000..dc6667f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public interface FlowFileQueueFactory {
+ FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java
new file mode 100644
index 0000000..7ebc017
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class FlowFileQueueSize {
+ private final int activeQueueCount;
+ private final long activeQueueBytes;
+ private final int swappedCount;
+ private final long swappedBytes;
+ private final int swapFiles;
+ private final int unacknowledgedCount;
+ private final long unacknowledgedBytes;
+
+ public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, final int swapFileCount,
+ final int unacknowledgedCount, final long unacknowledgedBytes) {
+ this.activeQueueCount = activeQueueCount;
+ this.activeQueueBytes = activeQueueBytes;
+ this.swappedCount = swappedCount;
+ this.swappedBytes = swappedBytes;
+ this.swapFiles = swapFileCount;
+ this.unacknowledgedCount = unacknowledgedCount;
+ this.unacknowledgedBytes = unacknowledgedBytes;
+ }
+
+ public int getSwappedCount() {
+ return swappedCount;
+ }
+
+ public long getSwappedBytes() {
+ return swappedBytes;
+ }
+
+ public int getSwapFileCount() {
+ return swapFiles;
+ }
+
+ public int getActiveCount() {
+ return activeQueueCount;
+ }
+
+ public long getActiveBytes() {
+ return activeQueueBytes;
+ }
+
+ public int getUnacknowledgedCount() {
+ return unacknowledgedCount;
+ }
+
+ public long getUnacknowledgedBytes() {
+ return unacknowledgedBytes;
+ }
+
+ public boolean isEmpty() {
+ return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0;
+ }
+
+ public QueueSize toQueueSize() {
+ return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes);
+ }
+
+ public QueueSize activeQueueSize() {
+ return new QueueSize(activeQueueCount, activeQueueBytes);
+ }
+
+ public QueueSize unacknowledgedQueueSize() {
+ return new QueueSize(unacknowledgedCount, unacknowledgedBytes);
+ }
+
+ public QueueSize swapQueueSize() {
+ return new QueueSize(swappedCount, swappedBytes);
+ }
+
+ @Override
+ public String toString() {
+ return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes +
+ " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
+ " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java
new file mode 100644
index 0000000..9492435
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class MaxQueueSize {
+ private final String maxSize;
+ private final long maxBytes;
+ private final long maxCount;
+
+ public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
+ this.maxSize = maxSize;
+ this.maxBytes = maxBytes;
+ this.maxCount = maxCount;
+ }
+
+ public String getMaxSize() {
+ return maxSize;
+ }
+
+ public long getMaxBytes() {
+ return maxBytes;
+ }
+
+ public long getMaxCount() {
+ return maxCount;
+ }
+
+ @Override
+ public String toString() {
+ return maxCount + " Objects/" + maxSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java
new file mode 100644
index 0000000..d641da4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class NopConnectionEventListener implements ConnectionEventListener {
+ @Override
+ public void triggerSourceEvent() {
+ }
+
+ @Override
+ public void triggerDestinationEvent() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
new file mode 100644
index 0000000..b78ccff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
+public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable {
+ private static final long serialVersionUID = 1L;
+ private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+
+ public QueuePrioritizer(final List<FlowFilePrioritizer> priorities) {
+ if (null != priorities) {
+ prioritizers.addAll(priorities);
+ }
+ }
+
+ @Override
+ public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
+ int returnVal = 0;
+ final boolean f1Penalized = f1.isPenalized();
+ final boolean f2Penalized = f2.isPenalized();
+
+ if (f1Penalized && !f2Penalized) {
+ return 1;
+ } else if (!f1Penalized && f2Penalized) {
+ return -1;
+ }
+
+ if (f1Penalized && f2Penalized) {
+ if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
+ return -1;
+ } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
+ return 1;
+ }
+ }
+
+ if (!prioritizers.isEmpty()) {
+ for (final FlowFilePrioritizer prioritizer : prioritizers) {
+ returnVal = prioritizer.compare(f1, f2);
+ if (returnVal != 0) {
+ return returnVal;
+ }
+ }
+ }
+
+ final ContentClaim claim1 = f1.getContentClaim();
+ final ContentClaim claim2 = f2.getContentClaim();
+
+ // put the one without a claim first
+ if (claim1 == null && claim2 != null) {
+ return -1;
+ } else if (claim1 != null && claim2 == null) {
+ return 1;
+ } else if (claim1 != null && claim2 != null) {
+ final int claimComparison = claim1.compareTo(claim2);
+ if (claimComparison != 0) {
+ return claimComparison;
+ }
+
+ final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
+ if (claimOffsetComparison != 0) {
+ return claimOffsetComparison;
+ }
+ }
+
+ return Long.compare(f1.getId(), f2.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
new file mode 100644
index 0000000..cab41e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.util.concurrency.TimedLock;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
+ * processing. Must be thread safe.
+ *
+ */
+public class StandardFlowFileQueue extends AbstractFlowFileQueue implements FlowFileQueue {
+
+ private final SwappablePriorityQueue queue;
+ private final ConnectionEventListener eventListener;
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private final FlowFileSwapManager swapManager;
+ private final TimedLock writeLock;
+
+
+ public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
+ final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
+ final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
+
+ super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
+ this.swapManager = swapManager;
+ this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
+ this.eventListener = eventListener;
+
+ writeLock = new TimedLock(this.lock.writeLock(), getIdentifier() + " Write Lock", 100);
+
+ setBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
+ setBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
+ }
+
+ @Override
+ public void startLoadBalancing() {
+ }
+
+ @Override
+ public void stopLoadBalancing() {
+ }
+
+ @Override
+ public boolean isActivelyLoadBalancing() {
+ return false;
+ }
+
+ @Override
+ public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+ queue.setPriorities(newPriorities);
+ }
+
+ @Override
+ public List<FlowFilePrioritizer> getPriorities() {
+ return queue.getPriorities();
+ }
+
+ @Override
+ protected List<FlowFileRecord> getListableFlowFiles() {
+ return queue.getActiveFlowFiles();
+ }
+
+ @Override
+ public QueueDiagnostics getQueueDiagnostics() {
+ return new StandardQueueDiagnostics(queue.getQueueDiagnostics(), Collections.emptyList());
+ }
+
+ @Override
+ public void put(final FlowFileRecord file) {
+ queue.put(file);
+
+ eventListener.triggerDestinationEvent();
+ }
+
+ @Override
+ public void putAll(final Collection<FlowFileRecord> files) {
+ queue.putAll(files);
+
+ eventListener.triggerDestinationEvent();
+ }
+
+
+ @Override
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+ // First check if we have any records Pre-Fetched.
+ final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS);
+ return queue.poll(expiredRecords, expirationMillis);
+ }
+
+
+ @Override
+ public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
+ return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+ }
+
+
+
+ @Override
+ public void acknowledge(final FlowFileRecord flowFile) {
+ queue.acknowledge(flowFile);
+
+ eventListener.triggerSourceEvent();
+ }
+
+ @Override
+ public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+ queue.acknowledge(flowFiles);
+
+ eventListener.triggerSourceEvent();
+ }
+
+ @Override
+ public boolean isUnacknowledgedFlowFile() {
+ return queue.isUnacknowledgedFlowFile();
+ }
+
+ @Override
+ public QueueSize size() {
+ return queue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.getFlowFileQueueSize().isEmpty();
+ }
+
+ @Override
+ public boolean isActiveQueueEmpty() {
+ final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize();
+ return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+ return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public void purgeSwapFiles() {
+ swapManager.purge();
+ }
+
+ @Override
+ public SwapSummary recoverSwappedFlowFiles() {
+ return queue.recoverSwappedFlowFiles();
+ }
+
+ @Override
+ public String toString() {
+ return "FlowFileQueue[id=" + getIdentifier() + "]";
+ }
+
+
+ @Override
+ public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException {
+ return queue.getFlowFile(flowFileUuid);
+ }
+
+
+ @Override
+ protected void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) {
+ queue.dropFlowFiles(dropRequest, requestor);
+ }
+
+
+ /**
+ * Lock the queue so that other threads are unable to interact with the queue
+ */
+ public void lock() {
+ writeLock.lock();
+ }
+
+ /**
+ * Unlock the queue
+ */
+ public void unlock() {
+ writeLock.unlock("external unlock");
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java
new file mode 100644
index 0000000..ff31e77
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class StandardLocalQueuePartitionDiagnostics implements LocalQueuePartitionDiagnostics {
+ private final FlowFileQueueSize queueSize;
+ private final boolean anyPenalized;
+ private final boolean allPenalized;
+
+ public StandardLocalQueuePartitionDiagnostics(final FlowFileQueueSize queueSize, final boolean anyPenalized, final boolean allPenalized) {
+ this.queueSize = queueSize;
+ this.anyPenalized = anyPenalized;
+ this.allPenalized = allPenalized;
+ }
+
+ @Override
+ public QueueSize getUnacknowledgedQueueSize() {
+ return new QueueSize(queueSize.getUnacknowledgedCount(), queueSize.getUnacknowledgedCount());
+ }
+
+ @Override
+ public QueueSize getActiveQueueSize() {
+ return new QueueSize(queueSize.getActiveCount(), queueSize.getActiveBytes());
+ }
+
+ @Override
+ public QueueSize getSwapQueueSize() {
+ return new QueueSize(queueSize.getSwappedCount(), queueSize.getSwappedBytes());
+ }
+
+ @Override
+ public int getSwapFileCount() {
+ return queueSize.getSwapFileCount();
+ }
+
+ @Override
+ public boolean isAnyActiveFlowFilePenalized() {
+ return anyPenalized;
+ }
+
+ @Override
+ public boolean isAllActiveFlowFilesPenalized() {
+ return allPenalized;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java
new file mode 100644
index 0000000..be42e2e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.util.List;
+
+public class StandardQueueDiagnostics implements QueueDiagnostics {
+ final LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics;
+ final List<RemoteQueuePartitionDiagnostics> remoteQueuePartitionDiagnostics;
+
+ public StandardQueueDiagnostics(final LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics, final List<RemoteQueuePartitionDiagnostics> remoteQueuePartitionDiagnostics) {
+ this.localQueuePartitionDiagnostics = localQueuePartitionDiagnostics;
+ this.remoteQueuePartitionDiagnostics = remoteQueuePartitionDiagnostics;
+ }
+
+ @Override
+ public LocalQueuePartitionDiagnostics getLocalQueuePartitionDiagnostics() {
+ return localQueuePartitionDiagnostics;
+ }
+
+ @Override
+ public List<RemoteQueuePartitionDiagnostics> getRemoteQueuePartitionDiagnostics() {
+ return remoteQueuePartitionDiagnostics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java
new file mode 100644
index 0000000..6790055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class StandardRemoteQueuePartitionDiagnostics implements RemoteQueuePartitionDiagnostics {
+ private final String nodeId;
+ private final FlowFileQueueSize queueSize;
+
+ public StandardRemoteQueuePartitionDiagnostics(final String nodeId, final FlowFileQueueSize queueSize) {
+ this.nodeId = nodeId;
+ this.queueSize = queueSize;
+ }
+
+ @Override
+ public String getNodeIdentifier() {
+ return nodeId;
+ }
+
+ @Override
+ public QueueSize getUnacknowledgedQueueSize() {
+ return new QueueSize(queueSize.getUnacknowledgedCount(), queueSize.getUnacknowledgedCount());
+ }
+
+ @Override
+ public QueueSize getActiveQueueSize() {
+ return new QueueSize(queueSize.getActiveCount(), queueSize.getActiveBytes());
+ }
+
+ @Override
+ public QueueSize getSwapQueueSize() {
+ return new QueueSize(queueSize.getSwappedCount(), queueSize.getSwappedBytes());
+ }
+
+ @Override
+ public int getSwapFileCount() {
+ return queueSize.getSwapFileCount();
+ }
+}