You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:06 UTC

[10/14] nifi git commit: NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so th

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
new file mode 100644
index 0000000..5bf75a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractFlowFileQueue implements FlowFileQueue {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractFlowFileQueue.class);
+    private final String identifier;
+    private final FlowFileRepository flowFileRepository;
+    private final ProvenanceEventRepository provRepository;
+    private final ResourceClaimManager resourceClaimManager;
+    private final ProcessScheduler scheduler;
+
+    private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
+    private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("1 GB", 1024 * 1024 * 1024, 10000));
+
+    private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
+
+    private LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.DO_NOT_LOAD_BALANCE;
+    private String partitioningAttribute = null;
+
+    private LoadBalanceCompression compression = LoadBalanceCompression.DO_NOT_COMPRESS;
+
+
+    public AbstractFlowFileQueue(final String identifier, final ProcessScheduler scheduler,
+            final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, final ResourceClaimManager resourceClaimManager) {
+        this.identifier = identifier;
+        this.scheduler = scheduler;
+        this.flowFileRepository = flowFileRepo;
+        this.provRepository = provRepo;
+        this.resourceClaimManager = resourceClaimManager;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    protected ProcessScheduler getScheduler() {
+        return scheduler;
+    }
+
+    @Override
+    public String getFlowFileExpiration() {
+        return expirationPeriod.get().getPeriod();
+    }
+
+    @Override
+    public int getFlowFileExpiration(final TimeUnit timeUnit) {
+        return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setFlowFileExpiration(final String flowExpirationPeriod) {
+        final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
+        if (millis < 0) {
+            throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
+        }
+
+        expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
+    }
+
+    @Override
+    public void setBackPressureObjectThreshold(final long threshold) {
+        boolean updated = false;
+        while (!updated) {
+            MaxQueueSize maxSize = getMaxQueueSize();
+            final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
+            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+        }
+    }
+
+    @Override
+    public long getBackPressureObjectThreshold() {
+        return getMaxQueueSize().getMaxCount();
+    }
+
+    @Override
+    public void setBackPressureDataSizeThreshold(final String maxDataSize) {
+        final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+
+        boolean updated = false;
+        while (!updated) {
+            MaxQueueSize maxSize = getMaxQueueSize();
+            final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
+            updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
+        }
+    }
+
+    @Override
+    public String getBackPressureDataSizeThreshold() {
+        return getMaxQueueSize().getMaxSize();
+    }
+
+    private MaxQueueSize getMaxQueueSize() {
+        return maxQueueSize.get();
+    }
+
+    @Override
+    public boolean isFull() {
+        final MaxQueueSize maxSize = getMaxQueueSize();
+
+        // Check if max size is set
+        if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
+            return false;
+        }
+
+        final QueueSize queueSize = size();
+        if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
+            return true;
+        }
+
+        if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
+            return true;
+        }
+
+        return false;
+    }
+
+
+    @Override
+    public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
+        // purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother
+        if (listRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
+                final ListFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == ListFlowFileState.COMPLETE || request.getState() == ListFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
+
+            for (final String requestId : toDrop) {
+                listRequestMap.remove(requestId);
+            }
+        }
+
+        // numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
+        final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size());
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int position = 0;
+                int resultCount = 0;
+                final List<FlowFileSummary> summaries = new ArrayList<>();
+
+                // Create an ArrayList that contains all of the contents of the active queue.
+                // We do this so that we don't have to hold the lock any longer than absolutely necessary.
+                // We cannot simply pull the first 'maxResults' records from the queue, however, because the
+                // Iterator provided by PriorityQueue does not return records in order. So we would have to either
+                // use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and
+                // do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do
+                // the sorting to put the records back. So even though this has an expensive of Java Heap to create the
+                // extra collection, we are making this trade-off to avoid locking the queue any longer than required.
+                final List<FlowFileRecord> allFlowFiles = getListableFlowFiles();
+                final QueuePrioritizer prioritizer = new QueuePrioritizer(getPriorities());
+
+                listRequest.setState(ListFlowFileState.CALCULATING_LIST);
+
+                // sort the FlowFileRecords so that we have the list in the same order as on the queue.
+                allFlowFiles.sort(prioritizer);
+
+                for (final FlowFileRecord flowFile : allFlowFiles) {
+                    summaries.add(summarize(flowFile, ++position));
+                    if (summaries.size() >= maxResults) {
+                        break;
+                    }
+                }
+
+                logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", this, resultCount);
+                listRequest.setFlowFileSummaries(summaries);
+                listRequest.setState(ListFlowFileState.COMPLETE);
+            }
+        }, "List FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
+
+        listRequestMap.put(requestIdentifier, listRequest);
+        return listRequest;
+    }
+
+    @Override
+    public ListFlowFileStatus getListFlowFileStatus(final String requestIdentifier) {
+        return listRequestMap.get(requestIdentifier);
+    }
+
+    @Override
+    public ListFlowFileStatus cancelListFlowFileRequest(final String requestIdentifier) {
+        logger.info("Canceling ListFlowFile Request with ID {}", requestIdentifier);
+        final ListFlowFileRequest request = listRequestMap.remove(requestIdentifier);
+        if (request != null) {
+            request.cancel();
+        }
+
+        return request;
+    }
+
+    /**
+     * @return all FlowFiles that should be listed in response to a List Queue request
+     */
+    protected abstract List<FlowFileRecord> getListableFlowFiles();
+
+
+    @Override
+    public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
+        logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
+
+        // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
+        if (dropRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
+                final DropFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
+
+            for (final String requestId : toDrop) {
+                dropRequestMap.remove(requestId);
+            }
+        }
+
+        final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
+        final QueueSize originalSize = size();
+        dropRequest.setCurrentSize(originalSize);
+        dropRequest.setOriginalSize(originalSize);
+        if (originalSize.getObjectCount() == 0) {
+            dropRequest.setDroppedSize(originalSize);
+            dropRequest.setState(DropFlowFileState.COMPLETE);
+            dropRequestMap.put(requestIdentifier, dropRequest);
+            return dropRequest;
+        }
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                dropFlowFiles(dropRequest, requestor);
+            }
+        }, "Drop FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
+
+        dropRequestMap.put(requestIdentifier, dropRequest);
+
+        return dropRequest;
+    }
+
+
+    @Override
+    public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) {
+        final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
+        if (request == null) {
+            return null;
+        }
+
+        request.cancel();
+        return request;
+    }
+
+    @Override
+    public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
+        return dropRequestMap.get(requestIdentifier);
+    }
+
+    /**
+     * Synchronously drops all FlowFiles in the queue
+     *
+     * @param dropRequest the request
+     * @param requestor the identity of the user/agent who made the request
+     */
+    protected abstract void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor);
+
+    @Override
+    public void verifyCanList() throws IllegalStateException {
+    }
+
+
+    protected FlowFileSummary summarize(final FlowFile flowFile, final int position) {
+        // extract all of the information that we care about into new variables rather than just
+        // wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
+        // be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
+        // we will end up holding the entire FlowFile (including all Attributes) in the Java heap as well,
+        // which can be problematic if we expect them to be swapped out.
+        final String uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+        final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+        final long size = flowFile.getSize();
+        final Long lastQueuedTime = flowFile.getLastQueueDate();
+        final long lineageStart = flowFile.getLineageStartDate();
+        final boolean penalized = flowFile.isPenalized();
+
+        return new FlowFileSummary() {
+            @Override
+            public String getUuid() {
+                return uuid;
+            }
+
+            @Override
+            public String getFilename() {
+                return filename;
+            }
+
+            @Override
+            public int getPosition() {
+                return position;
+            }
+
+            @Override
+            public long getSize() {
+                return size;
+            }
+
+            @Override
+            public long getLastQueuedTime() {
+                return lastQueuedTime == null ? 0L : lastQueuedTime;
+            }
+
+            @Override
+            public long getLineageStartDate() {
+                return lineageStart;
+            }
+
+            @Override
+            public boolean isPenalized() {
+                return penalized;
+            }
+        };
+    }
+
+    protected QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
+        // Create a Provenance Event and a FlowFile Repository record for each FlowFile
+        final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
+        final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
+        for (final FlowFileRecord flowFile : flowFiles) {
+            provenanceEvents.add(createDropProvenanceEvent(flowFile, requestor));
+            flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
+        }
+
+        long dropContentSize = 0L;
+        for (final FlowFileRecord flowFile : flowFiles) {
+            dropContentSize += flowFile.getSize();
+            final ContentClaim contentClaim = flowFile.getContentClaim();
+            if (contentClaim == null) {
+                continue;
+            }
+
+            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+            if (resourceClaim == null) {
+                continue;
+            }
+
+            resourceClaimManager.decrementClaimantCount(resourceClaim);
+        }
+
+        provRepository.registerEvents(provenanceEvents);
+        flowFileRepository.updateRepository(flowFileRepoRecords);
+        return new QueueSize(flowFiles.size(), dropContentSize);
+    }
+
+    private ProvenanceEventRecord createDropProvenanceEvent(final FlowFileRecord flowFile, final String requestor) {
+        final ProvenanceEventBuilder builder = provRepository.eventBuilder();
+        builder.fromFlowFile(flowFile);
+        builder.setEventType(ProvenanceEventType.DROP);
+        builder.setLineageStartDate(flowFile.getLineageStartDate());
+        builder.setComponentId(getIdentifier());
+        builder.setComponentType("Connection");
+        builder.setAttributes(flowFile.getAttributes(), Collections.emptyMap());
+        builder.setDetails("FlowFile Queue emptied by " + requestor);
+        builder.setSourceQueueIdentifier(getIdentifier());
+
+        final ContentClaim contentClaim = flowFile.getContentClaim();
+        if (contentClaim != null) {
+            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+            builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
+        }
+
+        return builder.build();
+    }
+
+    private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
+        return new DropFlowFileRepositoryRecord(this, flowFile);
+    }
+
+    @Override
+    public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) {
+        if (strategy == LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE && !FlowFile.KeyValidator.isValid(partitioningAttribute)) {
+            throw new IllegalArgumentException("Cannot set Load Balance Strategy to " + strategy + " without providing a valid Partitioning Attribute");
+        }
+
+        this.loadBalanceStrategy = strategy;
+        this.partitioningAttribute = partitioningAttribute;
+    }
+
+    @Override
+    public synchronized String getPartitioningAttribute() {
+        return partitioningAttribute;
+    }
+
+    @Override
+    public synchronized LoadBalanceStrategy getLoadBalanceStrategy() {
+        return loadBalanceStrategy;
+    }
+
+    @Override
+    public synchronized void setLoadBalanceCompression(final LoadBalanceCompression compression) {
+        this.compression = compression;
+    }
+
+    @Override
+    public synchronized LoadBalanceCompression getLoadBalanceCompression() {
+        return compression;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
new file mode 100644
index 0000000..9a220ae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.events.EventReporter;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class BlockingSwappablePriorityQueue extends SwappablePriorityQueue {
+    private final Object monitor = new Object();
+
+    public BlockingSwappablePriorityQueue(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, final FlowFileQueue flowFileQueue,
+        final DropFlowFileAction dropAction, final String partitionName) {
+
+        super(swapManager, swapThreshold, eventReporter, flowFileQueue, dropAction, partitionName);
+    }
+
+    @Override
+    public void put(final FlowFileRecord flowFile) {
+        super.put(flowFile);
+
+        synchronized (monitor) {
+            monitor.notify();
+        }
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> flowFiles) {
+        super.putAll(flowFiles);
+
+        synchronized (monitor) {
+            monitor.notifyAll();
+        }
+    }
+
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException {
+        final long maxTimestamp = System.currentTimeMillis() + waitMillis;
+
+        synchronized (monitor) {
+            FlowFileRecord flowFile = null;
+            do {
+                flowFile = super.poll(expiredRecords, expirationMillis);
+                if (flowFile != null) {
+                    return flowFile;
+                }
+
+                monitor.wait(waitMillis);
+            } while (System.currentTimeMillis() < maxTimestamp);
+
+            return null;
+        }
+    }
+
+    @Override
+    public void inheritQueueContents(final FlowFileQueueContents queueContents) {
+        // We have to override this method and synchronize on monitor before calling super.inheritQueueContents.
+        // If we don't do this, then our super class will obtain the write lock and call putAll, which will cause
+        // us to synchronize on monitor AFTER obtaining the write lock (WriteLock then monitor).
+        // If poll() is then called, we will synchronize on monitor, THEN attempt to obtain the write lock (monitor then WriteLock),
+        // which would cause a deadlock.
+        synchronized (monitor) {
+            super.inheritQueueContents(queueContents);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java
new file mode 100644
index 0000000..a3ae6ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/ConnectionEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public interface ConnectionEventListener {
+    void triggerSourceEvent();
+
+    void triggerDestinationEvent();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java
new file mode 100644
index 0000000..86cd169
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileAction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public interface DropFlowFileAction {
+    QueueSize drop(List<FlowFileRecord> flowFiles, String requestor) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
new file mode 100644
index 0000000..f47b4eb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRepositoryRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+
+public class DropFlowFileRepositoryRecord implements RepositoryRecord {
+    private final FlowFileQueue queue;
+    private final FlowFileRecord flowFile;
+
+    public DropFlowFileRepositoryRecord(final FlowFileQueue queue, final FlowFileRecord flowFile) {
+        this.queue = queue;
+        this.flowFile = flowFile;
+    }
+
+    @Override
+    public FlowFileQueue getDestination() {
+        return null;
+    }
+
+    @Override
+    public FlowFileQueue getOriginalQueue() {
+        return queue;
+    }
+
+    @Override
+    public RepositoryRecordType getType() {
+        return RepositoryRecordType.DELETE;
+    }
+
+    @Override
+    public ContentClaim getCurrentClaim() {
+        return flowFile.getContentClaim();
+    }
+
+    @Override
+    public ContentClaim getOriginalClaim() {
+        return flowFile.getContentClaim();
+    }
+
+    @Override
+    public long getCurrentClaimOffset() {
+        return flowFile.getContentClaimOffset();
+    }
+
+    @Override
+    public FlowFileRecord getCurrent() {
+        return flowFile;
+    }
+
+    @Override
+    public boolean isAttributesChanged() {
+        return false;
+    }
+
+    @Override
+    public boolean isMarkedForAbort() {
+        return false;
+    }
+
+    @Override
+    public String getSwapLocation() {
+        return null;
+    }
+
+    @Override
+    public List<ContentClaim> getTransientClaims() {
+        return Collections.emptyList();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
new file mode 100644
index 0000000..60ad64d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueContents.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.List;
+
+public class FlowFileQueueContents {
+    private final List<String> swapLocations;
+    private final List<FlowFileRecord> activeFlowFiles;
+    private final QueueSize swapSize;
+
+    public FlowFileQueueContents(final List<FlowFileRecord> activeFlowFiles, final List<String> swapLocations, final QueueSize swapSize) {
+        this.activeFlowFiles = activeFlowFiles;
+        this.swapLocations = swapLocations;
+        this.swapSize = swapSize;
+    }
+
+    public List<FlowFileRecord> getActiveFlowFiles() {
+        return activeFlowFiles;
+    }
+
+    public List<String> getSwapLocations() {
+        return swapLocations;
+    }
+
+    public QueueSize getSwapSize() {
+        return swapSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java
new file mode 100644
index 0000000..dc6667f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public interface FlowFileQueueFactory {
+    FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java
new file mode 100644
index 0000000..7ebc017
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/FlowFileQueueSize.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class FlowFileQueueSize {
+    private final int activeQueueCount;
+    private final long activeQueueBytes;
+    private final int swappedCount;
+    private final long swappedBytes;
+    private final int swapFiles;
+    private final int unacknowledgedCount;
+    private final long unacknowledgedBytes;
+
+    public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, final int swapFileCount,
+        final int unacknowledgedCount, final long unacknowledgedBytes) {
+        this.activeQueueCount = activeQueueCount;
+        this.activeQueueBytes = activeQueueBytes;
+        this.swappedCount = swappedCount;
+        this.swappedBytes = swappedBytes;
+        this.swapFiles = swapFileCount;
+        this.unacknowledgedCount = unacknowledgedCount;
+        this.unacknowledgedBytes = unacknowledgedBytes;
+    }
+
+    public int getSwappedCount() {
+        return swappedCount;
+    }
+
+    public long getSwappedBytes() {
+        return swappedBytes;
+    }
+
+    public int getSwapFileCount() {
+        return swapFiles;
+    }
+
+    public int getActiveCount() {
+        return activeQueueCount;
+    }
+
+    public long getActiveBytes() {
+        return activeQueueBytes;
+    }
+
+    public int getUnacknowledgedCount() {
+        return unacknowledgedCount;
+    }
+
+    public long getUnacknowledgedBytes() {
+        return unacknowledgedBytes;
+    }
+
+    public boolean isEmpty() {
+        return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0;
+    }
+
+    public QueueSize toQueueSize() {
+        return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes);
+    }
+
+    public QueueSize activeQueueSize() {
+        return new QueueSize(activeQueueCount, activeQueueBytes);
+    }
+
+    public QueueSize unacknowledgedQueueSize() {
+        return new QueueSize(unacknowledgedCount, unacknowledgedBytes);
+    }
+
+    public QueueSize swapQueueSize() {
+        return new QueueSize(swappedCount, swappedBytes);
+    }
+
+    @Override
+    public String toString() {
+        return "FlowFile Queue Size[ ActiveQueue=[" + activeQueueCount + ", " + activeQueueBytes +
+            " Bytes], Swap Queue=[" + swappedCount + ", " + swappedBytes +
+            " Bytes], Swap Files=[" + swapFiles + "], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java
new file mode 100644
index 0000000..9492435
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/MaxQueueSize.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class MaxQueueSize {
+    private final String maxSize;
+    private final long maxBytes;
+    private final long maxCount;
+
+    public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
+        this.maxSize = maxSize;
+        this.maxBytes = maxBytes;
+        this.maxCount = maxCount;
+    }
+
+    public String getMaxSize() {
+        return maxSize;
+    }
+
+    public long getMaxBytes() {
+        return maxBytes;
+    }
+
+    public long getMaxCount() {
+        return maxCount;
+    }
+
+    @Override
+    public String toString() {
+        return maxCount + " Objects/" + maxSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java
new file mode 100644
index 0000000..d641da4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/NopConnectionEventListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class NopConnectionEventListener implements ConnectionEventListener {
+    @Override
+    public void triggerSourceEvent() {
+    }
+
+    @Override
+    public void triggerDestinationEvent() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
new file mode 100644
index 0000000..b78ccff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/QueuePrioritizer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
+public class QueuePrioritizer implements Comparator<FlowFileRecord>, Serializable {
+    private static final long serialVersionUID = 1L;
+    private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+
+    public QueuePrioritizer(final List<FlowFilePrioritizer> priorities) {
+        if (null != priorities) {
+            prioritizers.addAll(priorities);
+        }
+    }
+
+    @Override
+    public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
+        int returnVal = 0;
+        final boolean f1Penalized = f1.isPenalized();
+        final boolean f2Penalized = f2.isPenalized();
+
+        if (f1Penalized && !f2Penalized) {
+            return 1;
+        } else if (!f1Penalized && f2Penalized) {
+            return -1;
+        }
+
+        if (f1Penalized && f2Penalized) {
+            if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
+                return -1;
+            } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
+                return 1;
+            }
+        }
+
+        if (!prioritizers.isEmpty()) {
+            for (final FlowFilePrioritizer prioritizer : prioritizers) {
+                returnVal = prioritizer.compare(f1, f2);
+                if (returnVal != 0) {
+                    return returnVal;
+                }
+            }
+        }
+
+        final ContentClaim claim1 = f1.getContentClaim();
+        final ContentClaim claim2 = f2.getContentClaim();
+
+        // put the one without a claim first
+        if (claim1 == null && claim2 != null) {
+            return -1;
+        } else if (claim1 != null && claim2 == null) {
+            return 1;
+        } else if (claim1 != null && claim2 != null) {
+            final int claimComparison = claim1.compareTo(claim2);
+            if (claimComparison != 0) {
+                return claimComparison;
+            }
+
+            final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
+            if (claimOffsetComparison != 0) {
+                return claimOffsetComparison;
+            }
+        }
+
+        return Long.compare(f1.getId(), f2.getId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
new file mode 100644
index 0000000..cab41e8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.util.concurrency.TimedLock;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
+ * processing. Must be thread safe.
+ *
+ */
+public class StandardFlowFileQueue extends AbstractFlowFileQueue implements FlowFileQueue {
+
+    private final SwappablePriorityQueue queue;
+    private final ConnectionEventListener eventListener;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+    private final FlowFileSwapManager swapManager;
+    private final TimedLock writeLock;
+
+
+    public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
+                                 final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
+                                 final int swapThreshold, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
+
+        super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
+        this.swapManager = swapManager;
+        this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
+        this.eventListener = eventListener;
+
+        writeLock = new TimedLock(this.lock.writeLock(), getIdentifier() + " Write Lock", 100);
+
+        setBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
+        setBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
+    }
+
+    @Override
+    public void startLoadBalancing() {
+    }
+
+    @Override
+    public void stopLoadBalancing() {
+    }
+
+    @Override
+    public boolean isActivelyLoadBalancing() {
+        return false;
+    }
+
+    @Override
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+        queue.setPriorities(newPriorities);
+    }
+
+    @Override
+    public List<FlowFilePrioritizer> getPriorities() {
+        return queue.getPriorities();
+    }
+
+    @Override
+    protected List<FlowFileRecord> getListableFlowFiles() {
+        return queue.getActiveFlowFiles();
+    }
+
+    @Override
+    public QueueDiagnostics getQueueDiagnostics() {
+        return new StandardQueueDiagnostics(queue.getQueueDiagnostics(), Collections.emptyList());
+    }
+
+    @Override
+    public void put(final FlowFileRecord file) {
+        queue.put(file);
+
+        eventListener.triggerDestinationEvent();
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> files) {
+        queue.putAll(files);
+
+        eventListener.triggerDestinationEvent();
+    }
+
+
+    @Override
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+        // First check if we have any records Pre-Fetched.
+        final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS);
+        return queue.poll(expiredRecords, expirationMillis);
+    }
+
+
+    @Override
+    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
+        return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+    }
+
+
+
+    @Override
+    public void acknowledge(final FlowFileRecord flowFile) {
+        queue.acknowledge(flowFile);
+
+        eventListener.triggerSourceEvent();
+    }
+
+    @Override
+    public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+        queue.acknowledge(flowFiles);
+
+        eventListener.triggerSourceEvent();
+    }
+
+    @Override
+    public boolean isUnacknowledgedFlowFile() {
+        return queue.isUnacknowledgedFlowFile();
+    }
+
+    @Override
+    public QueueSize size() {
+        return queue.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return queue.getFlowFileQueueSize().isEmpty();
+    }
+
+    @Override
+    public boolean isActiveQueueEmpty() {
+        final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize();
+        return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+        return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public void purgeSwapFiles() {
+        swapManager.purge();
+    }
+
+    @Override
+    public SwapSummary recoverSwappedFlowFiles() {
+        return queue.recoverSwappedFlowFiles();
+    }
+
+    @Override
+    public String toString() {
+        return "FlowFileQueue[id=" + getIdentifier() + "]";
+    }
+
+
+    @Override
+    public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException {
+        return queue.getFlowFile(flowFileUuid);
+    }
+
+
+    @Override
+    protected void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) {
+        queue.dropFlowFiles(dropRequest, requestor);
+    }
+
+
+    /**
+     * Lock the queue so that other threads are unable to interact with the queue
+     */
+    public void lock() {
+        writeLock.lock();
+    }
+
+    /**
+     * Unlock the queue
+     */
+    public void unlock() {
+        writeLock.unlock("external unlock");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java
new file mode 100644
index 0000000..ff31e77
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardLocalQueuePartitionDiagnostics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class StandardLocalQueuePartitionDiagnostics implements LocalQueuePartitionDiagnostics {
+    private final FlowFileQueueSize queueSize;
+    private final boolean anyPenalized;
+    private final boolean allPenalized;
+
+    public StandardLocalQueuePartitionDiagnostics(final FlowFileQueueSize queueSize, final boolean anyPenalized, final boolean allPenalized) {
+        this.queueSize = queueSize;
+        this.anyPenalized = anyPenalized;
+        this.allPenalized = allPenalized;
+    }
+
+    @Override
+    public QueueSize getUnacknowledgedQueueSize() {
+        return new QueueSize(queueSize.getUnacknowledgedCount(), queueSize.getUnacknowledgedCount());
+    }
+
+    @Override
+    public QueueSize getActiveQueueSize() {
+        return new QueueSize(queueSize.getActiveCount(), queueSize.getActiveBytes());
+    }
+
+    @Override
+    public QueueSize getSwapQueueSize() {
+        return new QueueSize(queueSize.getSwappedCount(), queueSize.getSwappedBytes());
+    }
+
+    @Override
+    public int getSwapFileCount() {
+        return queueSize.getSwapFileCount();
+    }
+
+    @Override
+    public boolean isAnyActiveFlowFilePenalized() {
+        return anyPenalized;
+    }
+
+    @Override
+    public boolean isAllActiveFlowFilesPenalized() {
+        return allPenalized;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java
new file mode 100644
index 0000000..be42e2e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardQueueDiagnostics.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import java.util.List;
+
+public class StandardQueueDiagnostics implements QueueDiagnostics {
+    final LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics;
+    final List<RemoteQueuePartitionDiagnostics> remoteQueuePartitionDiagnostics;
+
+    public StandardQueueDiagnostics(final LocalQueuePartitionDiagnostics localQueuePartitionDiagnostics, final List<RemoteQueuePartitionDiagnostics> remoteQueuePartitionDiagnostics) {
+        this.localQueuePartitionDiagnostics = localQueuePartitionDiagnostics;
+        this.remoteQueuePartitionDiagnostics = remoteQueuePartitionDiagnostics;
+    }
+
+    @Override
+    public LocalQueuePartitionDiagnostics getLocalQueuePartitionDiagnostics() {
+        return localQueuePartitionDiagnostics;
+    }
+
+    @Override
+    public List<RemoteQueuePartitionDiagnostics> getRemoteQueuePartitionDiagnostics() {
+        return remoteQueuePartitionDiagnostics;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java
new file mode 100644
index 0000000..6790055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardRemoteQueuePartitionDiagnostics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class StandardRemoteQueuePartitionDiagnostics implements RemoteQueuePartitionDiagnostics {
+    private final String nodeId;
+    private final FlowFileQueueSize queueSize;
+
+    public StandardRemoteQueuePartitionDiagnostics(final String nodeId, final FlowFileQueueSize queueSize) {
+        this.nodeId = nodeId;
+        this.queueSize = queueSize;
+    }
+
+    @Override
+    public String getNodeIdentifier() {
+        return nodeId;
+    }
+
+    @Override
+    public QueueSize getUnacknowledgedQueueSize() {
+        return new QueueSize(queueSize.getUnacknowledgedCount(), queueSize.getUnacknowledgedCount());
+    }
+
+    @Override
+    public QueueSize getActiveQueueSize() {
+        return new QueueSize(queueSize.getActiveCount(), queueSize.getActiveBytes());
+    }
+
+    @Override
+    public QueueSize getSwapQueueSize() {
+        return new QueueSize(queueSize.getSwappedCount(), queueSize.getSwappedBytes());
+    }
+
+    @Override
+    public int getSwapFileCount() {
+        return queueSize.getSwapFileCount();
+    }
+}