You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:05 UTC

[09/14] nifi git commit: NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so th

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
new file mode 100644
index 0000000..66b594d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -0,0 +1,990 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.IncompleteSwapFileException;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.swap.StandardSwapSummary;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.concurrency.TimedLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+public class SwappablePriorityQueue {
+    private static final Logger logger = LoggerFactory.getLogger(SwappablePriorityQueue.class);
+    private static final int SWAP_RECORD_POLL_SIZE = 10_000;
+    private static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 10_000;
+
+    private final int swapThreshold;
+    private final FlowFileSwapManager swapManager;
+    private final EventReporter eventReporter;
+    private final FlowFileQueue flowFileQueue;
+    private final DropFlowFileAction dropAction;
+    private final List<FlowFilePrioritizer> priorities = new ArrayList<>();
+    private final String swapPartitionName;
+
+    private final List<String> swapLocations = new ArrayList<>();
+    private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L));
+    private final TimedLock readLock;
+    private final TimedLock writeLock;
+
+    // We keep an "active queue" and a "swap queue" that both are able to hold records in heap. When
+    // FlowFiles are added to this FlowFileQueue, we first check if we are in "swap mode" and if so
+    // we add to the 'swap queue' instead of the 'active queue'. The code would be much simpler if we
+    // eliminated the 'swap queue' and instead just used the active queue and swapped out the 10,000
+    // lowest priority FlowFiles from that. However, doing that would cause problems with the ordering
+    // of FlowFiles. If we swap out some FlowFiles, and then allow a new FlowFile to be written to the
+    // active queue, then we would end up processing the newer FlowFile before the swapped FlowFile. By
+    // keeping these separate, we are able to guarantee that FlowFiles are swapped in in the same order
+    // that they are swapped out.
+    // Guarded by lock.
+    private PriorityQueue<FlowFileRecord> activeQueue;
+    private ArrayList<FlowFileRecord> swapQueue;
+    private boolean swapMode = false;
+
+    public SwappablePriorityQueue(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, final FlowFileQueue flowFileQueue,
+        final DropFlowFileAction dropAction, final String swapPartitionName) {
+        this.swapManager = swapManager;
+        this.swapThreshold = swapThreshold;
+
+        this.activeQueue = new PriorityQueue<>(20, new QueuePrioritizer(Collections.emptyList()));
+        this.swapQueue = new ArrayList<>();
+        this.eventReporter = eventReporter;
+        this.flowFileQueue = flowFileQueue;
+        this.dropAction = dropAction;
+        this.swapPartitionName = swapPartitionName;
+
+        final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+        readLock = new TimedLock(lock.readLock(), flowFileQueue.getIdentifier() + " Read Lock", 100);
+        writeLock = new TimedLock(lock.writeLock(), flowFileQueue.getIdentifier() + " Write Lock", 100);
+    }
+
+    private String getQueueIdentifier() {
+        return flowFileQueue.getIdentifier();
+    }
+
+    public synchronized List<FlowFilePrioritizer> getPriorities() {
+        readLock.lock();
+        try {
+            return Collections.unmodifiableList(priorities);
+        } finally {
+            readLock.unlock("getPriorities");
+        }
+    }
+
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+        writeLock.lock();
+        try {
+            priorities.clear();
+            priorities.addAll(newPriorities);
+
+            final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new QueuePrioritizer(newPriorities));
+            newQueue.addAll(activeQueue);
+            activeQueue = newQueue;
+        } finally {
+            writeLock.unlock("setPriorities");
+        }
+    }
+
+
+    public LocalQueuePartitionDiagnostics getQueueDiagnostics() {
+        readLock.lock();
+        try {
+            final boolean anyPenalized = !activeQueue.isEmpty() && activeQueue.peek().isPenalized();
+            final boolean allPenalized = anyPenalized && activeQueue.stream().anyMatch(FlowFileRecord::isPenalized);
+
+            return new StandardLocalQueuePartitionDiagnostics(getFlowFileQueueSize(), anyPenalized, allPenalized);
+        } finally {
+            readLock.unlock("getQueueDiagnostics");
+        }
+    }
+
+    public List<FlowFileRecord> getActiveFlowFiles() {
+        readLock.lock();
+        try {
+            return new ArrayList<>(activeQueue);
+        } finally {
+            readLock.unlock("getActiveFlowFiles");
+        }
+    }
+
+    public boolean isUnacknowledgedFlowFile() {
+        return getFlowFileQueueSize().getUnacknowledgedCount() > 0;
+    }
+
+    /**
+     * This method MUST be called with the write lock held
+     */
+    private void writeSwapFilesIfNecessary() {
+        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        migrateSwapToActive();
+
+        final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
+
+        int originalSwapQueueCount = swapQueue.size();
+        long originalSwapQueueBytes = 0L;
+        for (final FlowFileRecord flowFile : swapQueue) {
+            originalSwapQueueBytes += flowFile.getSize();
+        }
+
+        // Create a new Priority queue with the prioritizers that are set, but reverse the
+        // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
+        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new QueuePrioritizer(getPriorities())));
+        tempQueue.addAll(activeQueue);
+        tempQueue.addAll(swapQueue);
+
+        long bytesSwappedOut = 0L;
+        int flowFilesSwappedOut = 0;
+        final List<String> swapLocations = new ArrayList<>(numSwapFiles);
+        for (int i = 0; i < numSwapFiles; i++) {
+            // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
+            final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
+            for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
+                final FlowFileRecord flowFile = tempQueue.poll();
+                toSwap.add(flowFile);
+                bytesSwappedOut += flowFile.getSize();
+                flowFilesSwappedOut++;
+            }
+
+            try {
+                Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
+                final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName);
+                swapLocations.add(swapLocation);
+            } catch (final IOException ioe) {
+                tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
+
+                final int objectCount = getFlowFileCount();
+                logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting "
+                    + "the Java heap space but failed to write information to disk due to {}", getQueueIdentifier(), objectCount, ioe.toString());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getQueueIdentifier() + " has " + objectCount +
+                        " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. "
+                        + "See logs for more information.");
+                }
+
+                break;
+            }
+        }
+
+        // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
+        // swap queue. Then add the records back to the active queue.
+        swapQueue.clear();
+        long updatedSwapQueueBytes = 0L;
+        while (tempQueue.size() > swapThreshold) {
+            final FlowFileRecord record = tempQueue.poll();
+            swapQueue.add(record);
+            updatedSwapQueueBytes += record.getSize();
+        }
+
+        Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
+
+        // replace the contents of the active queue, since we've merged it with the swap queue.
+        activeQueue.clear();
+        FlowFileRecord toRequeue;
+        long activeQueueBytes = 0L;
+        while ((toRequeue = tempQueue.poll()) != null) {
+            activeQueue.offer(toRequeue);
+            activeQueueBytes += toRequeue.getSize();
+        }
+
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize originalSize = getFlowFileQueueSize();
+
+            final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount;
+            final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
+
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
+                originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut,
+                originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut,
+                originalSize.getSwapFileCount() + numSwapFiles,
+                originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes());
+            updated = updateSize(originalSize, newSize);
+
+            if (updated) {
+                logIfNegative(originalSize, newSize, "swap");
+            }
+        }
+
+        this.swapLocations.addAll(swapLocations);
+    }
+
+    private int getFlowFileCount() {
+        final FlowFileQueueSize size = getFlowFileQueueSize();
+        return size.getActiveCount() + size.getSwappedCount() + size.getUnacknowledgedCount();
+    }
+
+    /**
+     * If there are FlowFiles waiting on the swap queue, move them to the active
+     * queue until we meet our threshold. This prevents us from having to swap
+     * them to disk & then back out.
+     *
+     * This method MUST be called with the writeLock held.
+     */
+    private void migrateSwapToActive() {
+        // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't
+        // have to swap them out & then swap them back in.
+        // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles
+        // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out.
+        // In particular, this can happen if the queue is typically filled with surges.
+        // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave
+        // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in,
+        // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and
+        // swapped back in again.
+        // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
+        // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
+        // to disk, because we want them to be swapped back in in the same order that they were swapped out.
+
+        final int activeQueueSize = activeQueue.size();
+        if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that
+        // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the
+        // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot
+        // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
+        // first.
+        if (!swapLocations.isEmpty()) {
+            swapIn();
+            return;
+        }
+
+        // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
+        // of other checks for 99.999% of the cases.
+        final FlowFileQueueSize size = getFlowFileQueueSize();
+        if (size.getSwappedCount() == 0 && swapQueue.isEmpty()) {
+            return;
+        }
+
+        if (size.getSwappedCount() > swapQueue.size()) {
+            // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
+            // the files to be swapped back in first
+            return;
+        }
+
+        int recordsMigrated = 0;
+        long bytesMigrated = 0L;
+        final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
+        while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
+            final FlowFileRecord toMigrate = swapItr.next();
+            activeQueue.add(toMigrate);
+            bytesMigrated += toMigrate.getSize();
+            recordsMigrated++;
+            swapItr.remove();
+        }
+
+        if (recordsMigrated > 0) {
+            incrementActiveQueueSize(recordsMigrated, bytesMigrated);
+            incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
+        }
+
+        if (size.getSwappedCount() == 0) {
+            swapMode = false;
+        }
+    }
+
+    private void swapIn() {
+        final String swapLocation = swapLocations.get(0);
+        boolean partialContents = false;
+        SwapContents swapContents;
+        try {
+            swapContents = swapManager.swapIn(swapLocation, flowFileQueue);
+            swapLocations.remove(0);
+        } catch (final IncompleteSwapFileException isfe) {
+            logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation);
+            logger.error("", isfe);
+            swapContents = isfe.getPartialContents();
+            partialContents = true;
+            swapLocations.remove(0);
+        } catch (final FileNotFoundException fnfe) {
+            logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
+            if (eventReporter != null) {
+                eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
+            }
+
+            swapLocations.remove(0);
+            return;
+        } catch (final IOException ioe) {
+            logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
+            logger.error("", ioe);
+            if (eventReporter != null) {
+                eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
+                    swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
+            }
+
+            // We do not remove the Swap File from swapLocations because the IOException may be recoverable later. For instance, the file may be on a network
+            // drive and we may have connectivity problems, etc.
+            return;
+        } catch (final Throwable t) {
+            logger.error("Failed to swap in FlowFiles from Swap File {}", swapLocation, t);
+
+            // We do not remove the Swap File from swapLocations because this is an unexpected failure that may be retry-able. For example, if there were
+            // an OOME, etc. then we don't want to he queue to still reflect that the data is around but never swap it in. By leaving the Swap File
+            // in swapLocations, we will continue to retry.
+            throw t;
+        }
+
+        final QueueSize swapSize = swapContents.getSummary().getQueueSize();
+        final long contentSize = swapSize.getByteCount();
+        final int flowFileCount = swapSize.getObjectCount();
+        incrementSwapQueueSize(-flowFileCount, -contentSize, -1);
+
+        if (partialContents) {
+            // if we have partial results, we need to calculate the content size of the flowfiles
+            // actually swapped back in.
+            long contentSizeSwappedIn = 0L;
+            for (final FlowFileRecord swappedIn : swapContents.getFlowFiles()) {
+                contentSizeSwappedIn += swappedIn.getSize();
+            }
+
+            incrementActiveQueueSize(swapContents.getFlowFiles().size(), contentSizeSwappedIn);
+            logger.debug("Swapped in partial contents containing {} FlowFiles ({} bytes) from {}", swapContents.getFlowFiles().size(), contentSizeSwappedIn, swapLocation);
+        } else {
+            // we swapped in the whole swap file. We can just use the info that we got from the summary.
+            incrementActiveQueueSize(flowFileCount, contentSize);
+            logger.debug("Successfully swapped in Swap File {}", swapLocation);
+        }
+
+        activeQueue.addAll(swapContents.getFlowFiles());
+    }
+
+    public QueueSize size() {
+        return getFlowFileQueueSize().toQueueSize();
+    }
+
+    public boolean isEmpty() {
+        return getFlowFileQueueSize().isEmpty();
+    }
+
+    public boolean isActiveQueueEmpty() {
+        final FlowFileQueueSize queueSize = getFlowFileQueueSize();
+        return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
+    }
+
+    public void acknowledge(final FlowFileRecord flowFile) {
+        logger.debug("{} Acknowledging {}", this, flowFile);
+        incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
+    }
+
+    public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+        logger.debug("{} Acknowledging {}", this, flowFiles);
+        final long totalSize = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
+        incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
+    }
+
+
+    public void put(final FlowFileRecord flowFile) {
+        writeLock.lock();
+        try {
+            if (swapMode || activeQueue.size() >= swapThreshold) {
+                swapQueue.add(flowFile);
+                incrementSwapQueueSize(1, flowFile.getSize(), 0);
+                swapMode = true;
+                writeSwapFilesIfNecessary();
+            } else {
+                incrementActiveQueueSize(1, flowFile.getSize());
+                activeQueue.add(flowFile);
+            }
+
+            logger.debug("{} put to {}", flowFile, this);
+        } finally {
+            writeLock.unlock("put(FlowFileRecord)");
+        }
+    }
+
+    public void putAll(final Collection<FlowFileRecord> flowFiles) {
+        final int numFiles = flowFiles.size();
+        long bytes = 0L;
+        for (final FlowFile flowFile : flowFiles) {
+            bytes += flowFile.getSize();
+        }
+
+        writeLock.lock();
+        try {
+            if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
+                swapQueue.addAll(flowFiles);
+                incrementSwapQueueSize(numFiles, bytes, 0);
+                swapMode = true;
+                writeSwapFilesIfNecessary();
+            } else {
+                incrementActiveQueueSize(numFiles, bytes);
+                activeQueue.addAll(flowFiles);
+            }
+
+            logger.debug("{} put to {}", flowFiles, this);
+        } finally {
+            writeLock.unlock("putAll");
+        }
+    }
+
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        FlowFileRecord flowFile;
+
+        // First check if we have any records Pre-Fetched.
+        writeLock.lock();
+        try {
+            flowFile = doPoll(expiredRecords, expirationMillis);
+
+            if (flowFile != null) {
+                logger.debug("{} poll() returning {}", this, flowFile);
+                incrementUnacknowledgedQueueSize(1, flowFile.getSize());
+            }
+
+            return flowFile;
+        } finally {
+            writeLock.unlock("poll(Set)");
+        }
+    }
+
+
+    private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        FlowFileRecord flowFile;
+        boolean isExpired;
+
+        migrateSwapToActive();
+
+        long expiredBytes = 0L;
+        do {
+            flowFile = this.activeQueue.poll();
+
+            isExpired = isExpired(flowFile, expirationMillis);
+            if (isExpired) {
+                expiredRecords.add(flowFile);
+                expiredBytes += flowFile.getSize();
+                flowFile = null;
+
+                if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+                    break;
+                }
+            } else if (flowFile != null && flowFile.isPenalized()) {
+                this.activeQueue.add(flowFile);
+                flowFile = null;
+                break;
+            }
+
+            if (flowFile != null) {
+                incrementActiveQueueSize(-1, -flowFile.getSize());
+            }
+        } while (isExpired);
+
+        if (!expiredRecords.isEmpty()) {
+            incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
+        }
+
+        return flowFile;
+    }
+
+    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        final List<FlowFileRecord> records = new ArrayList<>(Math.min(1, maxResults));
+
+        // First check if we have any records Pre-Fetched.
+        writeLock.lock();
+        try {
+            doPoll(records, maxResults, expiredRecords, expirationMillis);
+        } finally {
+            writeLock.unlock("poll(int, Set)");
+        }
+
+        if (!records.isEmpty()) {
+            logger.debug("{} poll() returning {}", this, records);
+        }
+
+        return records;
+    }
+
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        long bytesPulled = 0L;
+        int flowFilesPulled = 0;
+
+        writeLock.lock();
+        try {
+            migrateSwapToActive();
+
+            final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
+            final List<FlowFileRecord> unselected = new ArrayList<>();
+
+            while (true) {
+                FlowFileRecord flowFile = this.activeQueue.poll();
+                if (flowFile == null) {
+                    break;
+                }
+
+                final boolean isExpired = isExpired(flowFile, expirationMillis);
+                if (isExpired) {
+                    expiredRecords.add(flowFile);
+                    bytesPulled += flowFile.getSize();
+                    flowFilesPulled++;
+
+                    if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+                        break;
+                    } else {
+                        continue;
+                    }
+                } else if (flowFile.isPenalized()) {
+                    this.activeQueue.add(flowFile);
+                    break; // just stop searching because the rest are all penalized.
+                }
+
+                final FlowFileFilterResult result = filter.filter(flowFile);
+                if (result.isAccept()) {
+                    bytesPulled += flowFile.getSize();
+                    flowFilesPulled++;
+
+                    incrementUnacknowledgedQueueSize(1, flowFile.getSize());
+                    selectedFlowFiles.add(flowFile);
+                } else {
+                    unselected.add(flowFile);
+                }
+
+                if (!result.isContinue()) {
+                    break;
+                }
+            }
+
+            this.activeQueue.addAll(unselected);
+            incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
+
+            if (!selectedFlowFiles.isEmpty()) {
+                logger.debug("{} poll() returning {}", this, selectedFlowFiles);
+            }
+
+            return selectedFlowFiles;
+        } finally {
+            writeLock.unlock("poll(Filter, Set)");
+        }
+    }
+
+    private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        migrateSwapToActive();
+
+        final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis);
+
+        long expiredBytes = 0L;
+        for (final FlowFileRecord record : expiredRecords) {
+            expiredBytes += record.getSize();
+        }
+
+        incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
+        incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
+    }
+
+
+    protected boolean isExpired(final FlowFile flowFile, final long expirationMillis) {
+        return isLaterThan(getExpirationDate(flowFile, expirationMillis));
+    }
+
+    private boolean isLaterThan(final Long maxAge) {
+        if (maxAge == null) {
+            return false;
+        }
+        return maxAge < System.currentTimeMillis();
+    }
+
+    private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) {
+        if (flowFile == null) {
+            return null;
+        }
+
+        if (expirationMillis <= 0) {
+            return null;
+        } else {
+            final long entryDate = flowFile.getEntryDate();
+            final long expirationDate = entryDate + expirationMillis;
+            return expirationDate;
+        }
+    }
+
+
+    private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        long drainedSize = 0L;
+        FlowFileRecord pulled;
+
+        while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
+            if (isExpired(pulled, expirationMillis)) {
+                expiredRecords.add(pulled);
+                if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+                    break;
+                }
+            } else {
+                if (pulled.isPenalized()) {
+                    sourceQueue.add(pulled);
+                    break;
+                }
+                destination.add(pulled);
+            }
+            drainedSize += pulled.getSize();
+        }
+        return drainedSize;
+    }
+
+
+    public FlowFileRecord getFlowFile(final String flowFileUuid) {
+        if (flowFileUuid == null) {
+            return null;
+        }
+
+        readLock.lock();
+        try {
+            // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID
+            for (final FlowFileRecord flowFile : activeQueue) {
+                if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
+                    return flowFile;
+                }
+            }
+        } finally {
+            readLock.unlock("getFlowFile");
+        }
+
+        return null;
+    }
+
+
+    public void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) {
+        final String requestIdentifier = dropRequest.getRequestIdentifier();
+
+        writeLock.lock();
+        try {
+            dropRequest.setState(DropFlowFileState.DROPPING_FLOWFILES);
+            logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, size());
+
+            try {
+                final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
+
+                QueueSize droppedSize;
+                try {
+                    if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+                        logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+                        return;
+                    }
+
+                    droppedSize = dropAction.drop(activeQueueRecords, requestor);
+                    logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
+                } catch (final IOException ioe) {
+                    logger.error("Failed to drop the FlowFiles from queue {} due to {}", getQueueIdentifier(), ioe.toString());
+                    logger.error("", ioe);
+
+                    dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                    return;
+                }
+
+                activeQueue.clear();
+                incrementActiveQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
+                dropRequest.setCurrentSize(size());
+                dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+
+                final QueueSize swapSize = getFlowFileQueueSize().swapQueueSize();
+                logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
+                    requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
+                if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+                    logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+                    return;
+                }
+
+                try {
+                    droppedSize = dropAction.drop(swapQueue, requestor);
+                } catch (final IOException ioe) {
+                    logger.error("Failed to drop the FlowFiles from queue {} due to {}", getQueueIdentifier(), ioe.toString());
+                    logger.error("", ioe);
+
+                    dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                    return;
+                }
+
+                swapQueue.clear();
+                dropRequest.setCurrentSize(size());
+                dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+                swapMode = false;
+                incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), 0);
+                logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize);
+
+                final int swapFileCount = swapLocations.size();
+                final Iterator<String> swapLocationItr = swapLocations.iterator();
+                while (swapLocationItr.hasNext()) {
+                    final String swapLocation = swapLocationItr.next();
+
+                    SwapContents swapContents = null;
+                    try {
+                        if (dropRequest.getState() == DropFlowFileState.CANCELED) {
+                            logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
+                            return;
+                        }
+
+                        swapContents = swapManager.swapIn(swapLocation, flowFileQueue);
+                        droppedSize = dropAction.drop(swapContents.getFlowFiles(), requestor);
+                    } catch (final IncompleteSwapFileException isfe) {
+                        swapContents = isfe.getPartialContents();
+                        final String warnMsg = "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the file was corrupt. "
+                            + "Some FlowFiles may not be dropped from the queue until NiFi is restarted.";
+
+                        logger.warn(warnMsg);
+                        if (eventReporter != null) {
+                            eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", warnMsg);
+                        }
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
+                            swapLocation, getQueueIdentifier(), ioe.toString());
+                        logger.error("", ioe);
+                        if (eventReporter != null) {
+                            eventReporter.reportEvent(Severity.ERROR, "Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + swapLocation
+                                + ". The FlowFiles contained in this Swap File will not be dropped from the queue");
+                        }
+
+                        dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
+                        if (swapContents != null) {
+                            activeQueue.addAll(swapContents.getFlowFiles()); // ensure that we don't lose the FlowFiles from our queue.
+                        }
+
+                        return;
+                    }
+
+                    dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+                    incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), -1);
+
+                    dropRequest.setCurrentSize(size());
+                    swapLocationItr.remove();
+                    logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation);
+                }
+
+                logger.debug("Dropped FlowFiles from {} Swap Files", swapFileCount);
+                logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
+                    dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), getQueueIdentifier(), requestor);
+                dropRequest.setState(DropFlowFileState.COMPLETE);
+            } catch (final Exception e) {
+                logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", getQueueIdentifier(), e.toString());
+                logger.error("", e);
+                dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
+            }
+        } finally {
+            writeLock.unlock("Drop FlowFiles");
+        }
+    }
+
+
+
+    public SwapSummary recoverSwappedFlowFiles() {
+        int swapFlowFileCount = 0;
+        long swapByteCount = 0L;
+        Long maxId = null;
+        List<ResourceClaim> resourceClaims = new ArrayList<>();
+        final long startNanos = System.nanoTime();
+
+        writeLock.lock();
+        try {
+            final List<String> swapLocations;
+            try {
+                swapLocations = swapManager.recoverSwapLocations(flowFileQueue, swapPartitionName);
+            } catch (final IOException ioe) {
+                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getQueueIdentifier());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
+                        getQueueIdentifier() + "; see logs for more detials");
+                }
+                return null;
+            }
+
+            logger.debug("Recovered {} Swap Files for {}: {}", swapLocations.size(), flowFileQueue, swapLocations);
+            for (final String swapLocation : swapLocations) {
+                try {
+                    final SwapSummary summary = swapManager.getSwapSummary(swapLocation);
+                    final QueueSize queueSize = summary.getQueueSize();
+                    final Long maxSwapRecordId = summary.getMaxFlowFileId();
+                    if (maxSwapRecordId != null) {
+                        if (maxId == null || maxSwapRecordId > maxId) {
+                            maxId = maxSwapRecordId;
+                        }
+                    }
+
+                    swapFlowFileCount += queueSize.getObjectCount();
+                    swapByteCount += queueSize.getByteCount();
+                    resourceClaims.addAll(summary.getResourceClaims());
+                } catch (final IOException ioe) {
+                    logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
+                    logger.error("", ioe);
+                    if (eventReporter != null) {
+                        eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
+                            "; the file appears to be corrupt. See logs for more details");
+                    }
+                }
+            }
+
+            incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size());
+            this.swapLocations.addAll(swapLocations);
+        } finally {
+            writeLock.unlock("Recover Swap Files");
+        }
+
+        if (!swapLocations.isEmpty()) {
+            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size(), this, millis);
+        }
+
+        return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims);
+    }
+
+
+
+    protected void incrementActiveQueueSize(final int count, final long bytes) {
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize original = size.get();
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(
+                original.getActiveCount() + count, original.getActiveBytes() + bytes,
+                original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(),
+                original.getUnacknowledgedCount(), original.getUnacknowledgedBytes());
+
+            updated = size.compareAndSet(original, newSize);
+
+            if (updated) {
+                logIfNegative(original, newSize, "active");
+            }
+        }
+    }
+
+    private void incrementSwapQueueSize(final int count, final long bytes, final int fileCount) {
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize original = getFlowFileQueueSize();
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(),
+                original.getSwappedCount() + count, original.getSwappedBytes() + bytes, original.getSwapFileCount() + fileCount,
+                original.getUnacknowledgedCount(), original.getUnacknowledgedBytes());
+
+            updated = updateSize(original, newSize);
+            if (updated) {
+                logIfNegative(original, newSize, "swap");
+            }
+        }
+    }
+
+    private void incrementUnacknowledgedQueueSize(final int count, final long bytes) {
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize original = size.get();
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(),
+                original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(),
+                original.getUnacknowledgedCount() + count, original.getUnacknowledgedBytes() + bytes);
+            updated = size.compareAndSet(original, newSize);
+
+            if (updated) {
+                logIfNegative(original, newSize, "Unacknowledged");
+            }
+        }
+    }
+
+    private void logIfNegative(final FlowFileQueueSize original, final FlowFileQueueSize newSize, final String counterName) {
+        if (newSize.getActiveBytes() < 0 || newSize.getActiveCount() < 0
+            || newSize.getSwappedBytes() < 0 || newSize.getSwappedCount() < 0
+            || newSize.getUnacknowledgedBytes() < 0 || newSize.getUnacknowledgedCount() < 0) {
+
+            logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, new RuntimeException("Cannot create negative queue size"));
+        }
+    }
+
+
+    protected boolean updateSize(final FlowFileQueueSize expected, final FlowFileQueueSize updated) {
+        return size.compareAndSet(expected, updated);
+    }
+
+    public FlowFileQueueSize getFlowFileQueueSize() {
+        return size.get();
+    }
+
+    public void inheritQueueContents(final FlowFileQueueContents queueContents) {
+        writeLock.lock();
+        try {
+            putAll(queueContents.getActiveFlowFiles());
+            swapLocations.addAll(queueContents.getSwapLocations());
+            incrementSwapQueueSize(queueContents.getSwapSize().getObjectCount(), queueContents.getSwapSize().getByteCount(), queueContents.getSwapLocations().size());
+        } finally {
+            writeLock.unlock("inheritQueueContents");
+        }
+    }
+
+    public FlowFileQueueContents packageForRebalance(final String newPartitionName) {
+        writeLock.lock();
+        try {
+            final List<FlowFileRecord> activeRecords = new ArrayList<>(this.activeQueue);
+            activeRecords.addAll(this.swapQueue);
+
+            final List<String> updatedSwapLocations = new ArrayList<>(swapLocations.size());
+            for (final String swapLocation : swapLocations) {
+                try {
+                    final String updatedSwapLocation = swapManager.changePartitionName(swapLocation, newPartitionName);
+                    updatedSwapLocations.add(updatedSwapLocation);
+                } catch (final IOException ioe) {
+                    logger.error("Failed to update Swap File {} to reflect that the contents are now owned by Partition '{}'", swapLocation, newPartitionName, ioe);
+                }
+            }
+
+            this.swapLocations.clear();
+            this.activeQueue.clear();
+            this.swapQueue.clear();
+
+            this.swapMode = false;
+
+            QueueSize swapSize = new QueueSize(0, 0L);
+            boolean updated = false;
+            while (!updated) {
+                final FlowFileQueueSize currentSize = getFlowFileQueueSize();
+                swapSize = new QueueSize(currentSize.getSwappedCount(), currentSize.getSwappedBytes());
+
+                final FlowFileQueueSize updatedSize = new FlowFileQueueSize(0, 0, 0, 0, 0, currentSize.getUnacknowledgedCount(), currentSize.getUnacknowledgedBytes());
+                updated = updateSize(currentSize, updatedSize);
+            }
+
+            return new FlowFileQueueContents(activeRecords, updatedSwapLocations, swapSize);
+        } finally {
+            writeLock.unlock("transfer(SwappablePriorityQueue)");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "SwappablePriorityQueue[queueId=" + flowFileQueue.getIdentifier() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/TimePeriod.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/TimePeriod.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/TimePeriod.java
new file mode 100644
index 0000000..40220cc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/TimePeriod.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class TimePeriod {
+    private final String period;
+    private final long millis;
+
+    public TimePeriod(final String period, final long millis) {
+        this.period = period;
+        this.millis = millis;
+    }
+
+    public String getPeriod() {
+        return period;
+    }
+
+    public long getMillis() {
+        return millis;
+    }
+
+    @Override
+    public String toString() {
+        return period;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/ContentRepositoryFlowFileAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/ContentRepositoryFlowFileAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/ContentRepositoryFlowFileAccess.java
new file mode 100644
index 0000000..28ae05d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/ContentRepositoryFlowFileAccess.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.io.LimitedInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ContentRepositoryFlowFileAccess implements FlowFileContentAccess {
+    private final ContentRepository contentRepository;
+
+    public ContentRepositoryFlowFileAccess(final ContentRepository contentRepository) {
+        this.contentRepository = contentRepository;
+    }
+
+    @Override
+    public InputStream read(final FlowFileRecord flowFile) throws IOException {
+        final InputStream rawIn;
+        try {
+            rawIn = contentRepository.read(flowFile.getContentClaim());
+        } catch (final ContentNotFoundException cnfe) {
+            throw new ContentNotFoundException(flowFile, flowFile.getContentClaim(), cnfe.getMessage());
+        }
+
+        if (flowFile.getContentClaimOffset() > 0) {
+            try {
+                StreamUtils.skip(rawIn, flowFile.getContentClaimOffset());
+            } catch (final EOFException eof) {
+                throw new ContentNotFoundException(flowFile, flowFile.getContentClaim(), "FlowFile has a Content Claim Offset of "
+                    + flowFile.getContentClaimOffset() + " bytes but the Content Claim does not have that many bytes");
+            }
+        }
+
+        final InputStream limitedIn = new LimitedInputStream(rawIn, flowFile.getSize());
+        // Wrap the Content Repository's InputStream with one that ensures that we are able to consume all of the FlowFile's content or else throws EOFException
+        return new FilterInputStream(limitedIn) {
+            private long bytesRead = 0;
+
+            @Override
+            public int read(final byte[] b, final int off, final int len) throws IOException {
+                return ensureNotTruncated(limitedIn.read(b, off, len));
+            }
+
+            @Override
+            public int read(final byte[] b) throws IOException {
+                return ensureNotTruncated(limitedIn.read(b));
+            }
+
+            @Override
+            public int read() throws IOException {
+                return ensureNotTruncated(limitedIn.read());
+            }
+
+            private int ensureNotTruncated(final int length) throws EOFException {
+                if (length > -1) {
+                    bytesRead += length;
+                    return length;
+                }
+
+                if (bytesRead < flowFile.getSize()) {
+                    throw new EOFException("Expected " + flowFile + " to contain " + flowFile.getSize() + " bytes but the content repository only had " + bytesRead + " bytes for it");
+                }
+
+                return length;
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/FlowFileContentAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/FlowFileContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/FlowFileContentAccess.java
new file mode 100644
index 0000000..4d956c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/FlowFileContentAccess.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public interface FlowFileContentAccess {
+
+    InputStream read(FlowFileRecord flowFile) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SimpleLimitThreshold.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SimpleLimitThreshold.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SimpleLimitThreshold.java
new file mode 100644
index 0000000..8b0cfa2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SimpleLimitThreshold.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+public class SimpleLimitThreshold implements TransactionThreshold {
+    private final int countLimit;
+    private final long byteLimit;
+
+    private int count = 0;
+    private long bytes = 0L;
+
+    public SimpleLimitThreshold(final int count, final long bytes) {
+        this.countLimit = count;
+        this.byteLimit = bytes;
+    }
+
+    @Override
+    public void adjust(final int flowFileCount, final long flowFileSize) {
+        count += flowFileCount;
+        bytes += flowFileSize;
+    }
+
+    @Override
+    public boolean isThresholdMet() {
+        return count >= countLimit || bytes >= byteLimit;
+    }
+}