You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/14 15:14:31 UTC

[1/3] nifi git commit: NIFI-730: Added error messages if we fail to drop FlowFiles from queue

Repository: nifi
Updated Branches:
  refs/heads/NIFI-730 72ff2a25d -> 77f7d7524


NIFI-730: Added error messages if we fail to drop FlowFiles from queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/afb76afc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/afb76afc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/afb76afc

Branch: refs/heads/NIFI-730
Commit: afb76afcd0fd7d0c144a37621fdabc181bd42307
Parents: 72ff2a2
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 15:57:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 15:57:18 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileStatus.java    |  5 +++
 .../nifi/controller/DropFlowFileRequest.java    | 11 +++++
 .../nifi/controller/StandardFlowFileQueue.java  | 45 ++++++++++++++++----
 3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index 7d5b9c2..737fbe3 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -70,4 +70,9 @@ public interface DropFlowFileStatus {
      * @return the current state of the operation
      */
     DropFlowFileState getState();
+
+    /**
+     * @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link DropFlowFileState#FAILURE}.
+     */
+    String getFailureReason();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 4104308..189fe7d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -30,6 +30,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
     private volatile QueueSize droppedSize = new QueueSize(0, 0L);
     private volatile long lastUpdated = System.currentTimeMillis();
     private volatile Thread executionThread;
+    private volatile String failureReason;
 
     private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
 
@@ -85,8 +86,18 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
         return lastUpdated;
     }
 
+    @Override
+    public String getFailureReason() {
+        return failureReason;
+    }
+
     synchronized void setState(final DropFlowFileState state) {
+        setState(state, null);
+    }
+
+    synchronized void setState(final DropFlowFileState state, final String explanation) {
         this.state = state;
+        this.failureReason = explanation;
         this.lastUpdated = System.currentTimeMillis();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/afb76afc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 82c1c7e..5b137f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                     try {
                         final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
-                        QueueSize droppedSize = drop(activeQueueRecords, requestor);
+
+                        QueueSize droppedSize;
+                        try {
+                            droppedSize = drop(activeQueueRecords, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
                         activeQueue.clear();
                         activeQueueContentSize = 0;
                         activeQueueSizeRef.set(0);
                         dropRequest.setCurrentSize(getQueueSize());
                         dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
 
-                        droppedSize = drop(swapQueue, requestor);
+                        try {
+                            droppedSize = drop(swapQueue, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
                         swapQueue.clear();
                         dropRequest.setCurrentSize(getQueueSize());
                         dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         final Iterator<String> swapLocationItr = swapLocations.iterator();
                         while (swapLocationItr.hasNext()) {
                             final String swapLocation = swapLocationItr.next();
-                            final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+
+                            List<FlowFileRecord> swappedIn = null;
                             try {
+                                swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
                                 droppedSize = drop(swappedIn, requestor);
-                            } catch (final Exception e) {
-                                activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
-                                throw e;
+                            } catch (final IOException ioe) {
+                                logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
+                                    swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                                logger.error("", ioe);
+
+                                dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
+                                if (swappedIn != null) {
+                                    activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
+                                }
+
+                                return;
                             }
 
                             dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
                         dropRequest.setState(DropFlowFileState.COMPLETE);
                     } catch (final Exception e) {
-                        // TODO: Handle adequately
-                        dropRequest.setState(DropFlowFileState.FAILURE);
+                        dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
                     }
                 } finally {
                     writeLock.unlock("Drop FlowFiles");


[2/3] nifi git commit: NIFI-730: bug fixes and code cleanup for swap manager and flowfile queue

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
new file mode 100644
index 0000000..acf2830
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -0,0 +1,1246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.concurrency.TimedLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
+ * processing. Must be thread safe.
+ *
+ */
+public final class StandardFlowFileQueue implements FlowFileQueue {
+
+    public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
+    public static final int SWAP_RECORD_POLL_SIZE = 10000;
+
+    private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
+
+    private PriorityQueue<FlowFileRecord> activeQueue = null;
+    private ArrayList<FlowFileRecord> swapQueue = null;
+
+    // private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
+    // private long activeQueueContentSize = 0L;
+    // private int swappedRecordCount = 0;
+    // private long swappedContentSize = 0L;
+    // private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
+
+    private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
+
+    private String maximumQueueDataSize;
+    private long maximumQueueByteCount;
+    private boolean swapMode = false;
+    private long maximumQueueObjectCount;
+
+    private final EventReporter eventReporter;
+    private final AtomicLong flowFileExpirationMillis;
+    private final Connection connection;
+    private final AtomicReference<String> flowFileExpirationPeriod;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+    private final List<FlowFilePrioritizer> priorities;
+    private final int swapThreshold;
+    private final FlowFileSwapManager swapManager;
+    private final List<String> swapLocations = new ArrayList<>();
+    private final TimedLock readLock;
+    private final TimedLock writeLock;
+    private final String identifier;
+    private final FlowFileRepository flowFileRepository;
+    private final ProvenanceEventRepository provRepository;
+    private final ResourceClaimManager resourceClaimManager;
+
+    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
+
+    // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
+    private final ProcessScheduler scheduler;
+
+    public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
+        final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
+        activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
+        priorities = new ArrayList<>();
+        maximumQueueObjectCount = 0L;
+        maximumQueueDataSize = "0 MB";
+        maximumQueueByteCount = 0L;
+        flowFileExpirationMillis = new AtomicLong(0);
+        flowFileExpirationPeriod = new AtomicReference<>("0 mins");
+        swapQueue = new ArrayList<>();
+        this.eventReporter = eventReporter;
+        this.swapManager = swapManager;
+        this.flowFileRepository = flowFileRepo;
+        this.provRepository = provRepo;
+        this.resourceClaimManager = resourceClaimManager;
+
+        this.identifier = identifier;
+        this.swapThreshold = swapThreshold;
+        this.scheduler = scheduler;
+        this.connection = connection;
+
+        readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
+        writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public List<FlowFilePrioritizer> getPriorities() {
+        return Collections.unmodifiableList(priorities);
+    }
+
+    @Override
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+        writeLock.lock();
+        try {
+            final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities));
+            newQueue.addAll(activeQueue);
+            activeQueue = newQueue;
+            priorities.clear();
+            priorities.addAll(newPriorities);
+        } finally {
+            writeLock.unlock("setPriorities");
+        }
+    }
+
+    @Override
+    public void setBackPressureObjectThreshold(final long maxQueueSize) {
+        writeLock.lock();
+        try {
+            maximumQueueObjectCount = maxQueueSize;
+            this.queueFullRef.set(determineIfFull());
+        } finally {
+            writeLock.unlock("setBackPressureObjectThreshold");
+        }
+    }
+
+    @Override
+    public long getBackPressureObjectThreshold() {
+        readLock.lock();
+        try {
+            return maximumQueueObjectCount;
+        } finally {
+            readLock.unlock("getBackPressureObjectThreshold");
+        }
+    }
+
+    @Override
+    public void setBackPressureDataSizeThreshold(final String maxDataSize) {
+        writeLock.lock();
+        try {
+            maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
+            maximumQueueDataSize = maxDataSize;
+            this.queueFullRef.set(determineIfFull());
+        } finally {
+            writeLock.unlock("setBackPressureDataSizeThreshold");
+        }
+    }
+
+    @Override
+    public String getBackPressureDataSizeThreshold() {
+        readLock.lock();
+        try {
+            return maximumQueueDataSize;
+        } finally {
+            readLock.unlock("getBackPressureDataSizeThreshold");
+        }
+    }
+
+    @Override
+    public QueueSize size() {
+        return getQueueSize();
+    }
+
+
+    private QueueSize getQueueSize() {
+        return size.get().toQueueSize();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size.get().isEmpty();
+    }
+
+    @Override
+    public boolean isActiveQueueEmpty() {
+        return size.get().activeQueueCount == 0;
+    }
+
+    public QueueSize getActiveQueueSize() {
+        return size.get().activeQueueSize();
+    }
+
+    @Override
+    public void acknowledge(final FlowFileRecord flowFile) {
+        if (queueFullRef.get()) {
+            writeLock.lock();
+            try {
+                incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
+                queueFullRef.set(determineIfFull());
+            } finally {
+                writeLock.unlock("acknowledge(FlowFileRecord)");
+            }
+        } else {
+            incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
+        }
+
+        if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+            // queue was full but no longer is. Notify that the source may now be available to run,
+            // because of back pressure caused by this queue.
+            scheduler.registerEvent(connection.getSource());
+        }
+    }
+
+    @Override
+    public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+        long totalSize = 0L;
+        for (final FlowFileRecord flowFile : flowFiles) {
+            totalSize += flowFile.getSize();
+        }
+
+        if (queueFullRef.get()) {
+            writeLock.lock();
+            try {
+                incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
+                queueFullRef.set(determineIfFull());
+            } finally {
+                writeLock.unlock("acknowledge(FlowFileRecord)");
+            }
+        } else {
+            incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
+        }
+
+        if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+            // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
+            // because of back pressure caused by this queue.
+            scheduler.registerEvent(connection.getSource());
+        }
+    }
+
+    @Override
+    public boolean isFull() {
+        return queueFullRef.get();
+    }
+
+    /**
+     * MUST be called with either the read or write lock held
+     *
+     * @return true if full
+     */
+    private boolean determineIfFull() {
+        final long maxSize = maximumQueueObjectCount;
+        final long maxBytes = maximumQueueByteCount;
+        if (maxSize <= 0 && maxBytes <= 0) {
+            return false;
+        }
+
+        final QueueSize queueSize = getQueueSize();
+        if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
+            return true;
+        }
+
+        if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void put(final FlowFileRecord file) {
+        writeLock.lock();
+        try {
+            if (swapMode || activeQueue.size() >= swapThreshold) {
+                swapQueue.add(file);
+                incrementSwapQueueSize(1, file.getSize());
+                swapMode = true;
+                writeSwapFilesIfNecessary();
+            } else {
+                incrementActiveQueueSize(1, file.getSize());
+                activeQueue.add(file);
+            }
+
+            queueFullRef.set(determineIfFull());
+        } finally {
+            writeLock.unlock("put(FlowFileRecord)");
+        }
+
+        if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+            scheduler.registerEvent(connection.getDestination());
+        }
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> files) {
+        final int numFiles = files.size();
+        long bytes = 0L;
+        for (final FlowFile flowFile : files) {
+            bytes += flowFile.getSize();
+        }
+
+        writeLock.lock();
+        try {
+            if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
+                swapQueue.addAll(files);
+                incrementSwapQueueSize(numFiles, bytes);
+                swapMode = true;
+                writeSwapFilesIfNecessary();
+            } else {
+                incrementActiveQueueSize(numFiles, bytes);
+                activeQueue.addAll(files);
+            }
+
+            queueFullRef.set(determineIfFull());
+        } finally {
+            writeLock.unlock("putAll");
+        }
+
+        if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+            scheduler.registerEvent(connection.getDestination());
+        }
+    }
+
+
+    private boolean isLaterThan(final Long maxAge) {
+        if (maxAge == null) {
+            return false;
+        }
+        return maxAge < System.currentTimeMillis();
+    }
+
+    private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) {
+        if (flowFile == null) {
+            return null;
+        }
+        if (expirationMillis <= 0) {
+            return null;
+        } else {
+            final long entryDate = flowFile.getEntryDate();
+            final long expirationDate = entryDate + expirationMillis;
+            return expirationDate;
+        }
+    }
+
+    @Override
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+        FlowFileRecord flowFile = null;
+
+        // First check if we have any records Pre-Fetched.
+        final long expirationMillis = flowFileExpirationMillis.get();
+        writeLock.lock();
+        try {
+            flowFile = doPoll(expiredRecords, expirationMillis);
+            return flowFile;
+        } finally {
+            writeLock.unlock("poll(Set)");
+
+            if (flowFile != null) {
+                incrementUnacknowledgedQueueSize(1, flowFile.getSize());
+            }
+        }
+    }
+
+    private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        FlowFileRecord flowFile;
+        boolean isExpired;
+
+        migrateSwapToActive();
+        final boolean queueFullAtStart = queueFullRef.get();
+
+        int expiredRecordCount = 0;
+        long expiredBytes = 0L;
+
+        do {
+            flowFile = this.activeQueue.poll();
+
+            isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
+            if (isExpired) {
+                expiredRecords.add(flowFile);
+                expiredRecordCount++;
+                expiredBytes += flowFile.getSize();
+
+                if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+                    break;
+                }
+            } else if (flowFile != null && flowFile.isPenalized()) {
+                this.activeQueue.add(flowFile);
+                flowFile = null;
+                break;
+            }
+
+            if (flowFile != null) {
+                incrementActiveQueueSize(-1, -flowFile.getSize());
+            }
+
+            if (expiredRecordCount > 0) {
+                incrementActiveQueueSize(-expiredRecordCount, -expiredBytes);
+            }
+        } while (isExpired);
+
+        // if at least 1 FlowFile was expired & the queue was full before we started, then
+        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+        // then the queue will still be full until the appropriate #acknowledge method is called.
+        if (queueFullAtStart && !expiredRecords.isEmpty()) {
+            queueFullRef.set(determineIfFull());
+        }
+
+        return isExpired ? null : flowFile;
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
+        final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
+
+        // First check if we have any records Pre-Fetched.
+        writeLock.lock();
+        try {
+            doPoll(records, maxResults, expiredRecords);
+        } finally {
+            writeLock.unlock("poll(int, Set)");
+        }
+        return records;
+    }
+
+    private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
+        migrateSwapToActive();
+
+        final boolean queueFullAtStart = queueFullRef.get();
+
+        final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
+
+        long expiredBytes = 0L;
+        for (final FlowFileRecord record : expiredRecords) {
+            expiredBytes += record.getSize();
+        }
+
+        incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
+        incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
+
+        // if at least 1 FlowFile was expired & the queue was full before we started, then
+        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+        // then the queue will still be full until the appropriate #acknowledge method is called.
+        if (queueFullAtStart && !expiredRecords.isEmpty()) {
+            queueFullRef.set(determineIfFull());
+        }
+    }
+
+    /**
+     * If there are FlowFiles waiting on the swap queue, move them to the active
+     * queue until we meet our threshold. This prevents us from having to swap
+     * them to disk & then back out.
+     *
+     * This method MUST be called with the writeLock held.
+     */
+    private void migrateSwapToActive() {
+        // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't
+        // have to swap them out & then swap them back in.
+        // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles
+        // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out.
+        // In particular, this can happen if the queue is typically filled with surges.
+        // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave
+        // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in,
+        // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and
+        // swapped back in again.
+        // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
+        // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
+        // to disk, because we want them to be swapped back in in the same order that they were swapped out.
+
+        if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that
+        // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the
+        // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot
+        // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
+        // first.
+        if (!swapLocations.isEmpty()) {
+            final String swapLocation = swapLocations.remove(0);
+            try {
+                final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this);
+                long swapSize = 0L;
+                for (final FlowFileRecord flowFile : swappedIn) {
+                    swapSize += flowFile.getSize();
+                }
+                incrementSwapQueueSize(-swappedIn.size(), -swapSize);
+                incrementActiveQueueSize(swappedIn.size(), swapSize);
+                activeQueue.addAll(swappedIn);
+                return;
+            } catch (final FileNotFoundException fnfe) {
+                logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
+                }
+                return;
+            } catch (final IOException ioe) {
+                logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
+                        swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
+                }
+                return;
+            }
+        }
+
+        // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
+        // of other checks for 99.999% of the cases.
+        if (size.get().swappedCount == 0 && swapQueue.isEmpty()) {
+            return;
+        }
+
+        if (size.get().swappedCount > swapQueue.size()) {
+            // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
+            // an external process to swap FlowFiles back in.
+            return;
+        }
+
+        int recordsMigrated = 0;
+        long bytesMigrated = 0L;
+        final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
+        while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
+            final FlowFileRecord toMigrate = swapItr.next();
+            activeQueue.add(toMigrate);
+            bytesMigrated += toMigrate.getSize();
+            recordsMigrated++;
+            swapItr.remove();
+        }
+
+        if (recordsMigrated > 0) {
+            incrementActiveQueueSize(recordsMigrated, bytesMigrated);
+            incrementSwapQueueSize(-recordsMigrated, -bytesMigrated);
+        }
+
+        if (size.get().swappedCount == 0) {
+            swapMode = false;
+        }
+    }
+
+    /**
+     * This method MUST be called with the write lock held
+     */
+    private void writeSwapFilesIfNecessary() {
+        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
+
+        int originalSwapQueueCount = swapQueue.size();
+        long originalSwapQueueBytes = 0L;
+        for (final FlowFileRecord flowFile : swapQueue) {
+            originalSwapQueueBytes += flowFile.getSize();
+        }
+
+        // Create a new Priority queue with the prioritizers that are set, but reverse the
+        // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
+        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities)));
+        tempQueue.addAll(activeQueue);
+        tempQueue.addAll(swapQueue);
+
+        long bytesSwappedOut = 0L;
+        int flowFilesSwappedOut = 0;
+        final List<String> swapLocations = new ArrayList<>(numSwapFiles);
+        for (int i = 0; i < numSwapFiles; i++) {
+            // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
+            final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
+            for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
+                final FlowFileRecord flowFile = tempQueue.poll();
+                toSwap.add(flowFile);
+                bytesSwappedOut += flowFile.getSize();
+                flowFilesSwappedOut++;
+            }
+
+            try {
+                Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
+                final String swapLocation = swapManager.swapOut(toSwap, this);
+                swapLocations.add(swapLocation);
+            } catch (final IOException ioe) {
+                tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
+                logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting "
+                    + "the Java heap space but failed to write information to disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), ioe.toString());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has " + getQueueSize().getObjectCount() +
+                        " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. "
+                        + "See logs for more information.");
+                }
+
+                break;
+            }
+        }
+
+        // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
+        // swap queue. Then add the records back to the active queue.
+        swapQueue.clear();
+        long updatedSwapQueueBytes = 0L;
+        while (tempQueue.size() > swapThreshold) {
+            final FlowFileRecord record = tempQueue.poll();
+            swapQueue.add(record);
+            updatedSwapQueueBytes += record.getSize();
+        }
+
+        Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
+
+        // replace the contents of the active queue, since we've merged it with the swap queue.
+        activeQueue.clear();
+        FlowFileRecord toRequeue;
+        long activeQueueBytes = 0L;
+        while ((toRequeue = tempQueue.poll()) != null) {
+            activeQueue.offer(toRequeue);
+            activeQueueBytes += toRequeue.getSize();
+        }
+
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize originalSize = size.get();
+
+            final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount;
+            final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
+
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
+                originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut,
+                originalSize.unacknowledgedCount, originalSize.unacknowledgedBytes);
+            updated = size.compareAndSet(originalSize, newSize);
+        }
+
+        this.swapLocations.addAll(swapLocations);
+    }
+
+
+    @Override
+    public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
+        long drainedSize = 0L;
+        FlowFileRecord pulled = null;
+
+        final long expirationMillis = this.flowFileExpirationMillis.get();
+        while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
+            if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
+                expiredRecords.add(pulled);
+                if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+                    break;
+                }
+            } else {
+                if (pulled.isPenalized()) {
+                    sourceQueue.add(pulled);
+                    break;
+                }
+                destination.add(pulled);
+            }
+            drainedSize += pulled.getSize();
+        }
+        return drainedSize;
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+        long bytesPulled = 0L;
+        int flowFilesPulled = 0;
+
+        writeLock.lock();
+        try {
+            migrateSwapToActive();
+
+            final long expirationMillis = this.flowFileExpirationMillis.get();
+            final boolean queueFullAtStart = queueFullRef.get();
+
+            final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
+            final List<FlowFileRecord> unselected = new ArrayList<>();
+
+            while (true) {
+                FlowFileRecord flowFile = this.activeQueue.poll();
+                if (flowFile == null) {
+                    break;
+                }
+
+                final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
+                if (isExpired) {
+                    expiredRecords.add(flowFile);
+                    bytesPulled += flowFile.getSize();
+                    flowFilesPulled++;
+
+                    if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
+                        break;
+                    } else {
+                        continue;
+                    }
+                } else if (flowFile.isPenalized()) {
+                    this.activeQueue.add(flowFile);
+                    flowFile = null;
+                    break; // just stop searching because the rest are all penalized.
+                }
+
+                final FlowFileFilterResult result = filter.filter(flowFile);
+                if (result.isAccept()) {
+                    bytesPulled += flowFile.getSize();
+                    flowFilesPulled++;
+
+                    incrementUnacknowledgedQueueSize(1, flowFile.getSize());
+                    selectedFlowFiles.add(flowFile);
+                } else {
+                    unselected.add(flowFile);
+                }
+
+                if (!result.isContinue()) {
+                    break;
+                }
+            }
+
+            this.activeQueue.addAll(unselected);
+
+            // if at least 1 FlowFile was expired & the queue was full before we started, then
+            // we need to determine whether or not the queue is full again. If no FlowFile was expired,
+            // then the queue will still be full until the appropriate #acknowledge method is called.
+            if (queueFullAtStart && !expiredRecords.isEmpty()) {
+                queueFullRef.set(determineIfFull());
+            }
+
+            return selectedFlowFiles;
+        } finally {
+            incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
+            writeLock.unlock("poll(Filter, Set)");
+        }
+    }
+
+
+
+    private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+        private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+
+        private Prioritizer(final List<FlowFilePrioritizer> priorities) {
+            if (null != priorities) {
+                prioritizers.addAll(priorities);
+            }
+        }
+
+        @Override
+        public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
+            int returnVal = 0;
+            final boolean f1Penalized = f1.isPenalized();
+            final boolean f2Penalized = f2.isPenalized();
+
+            if (f1Penalized && !f2Penalized) {
+                return 1;
+            } else if (!f1Penalized && f2Penalized) {
+                return -1;
+            }
+
+            if (f1Penalized && f2Penalized) {
+                if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
+                    return -1;
+                } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
+                    return 1;
+                }
+            }
+
+            if (!prioritizers.isEmpty()) {
+                for (final FlowFilePrioritizer prioritizer : prioritizers) {
+                    returnVal = prioritizer.compare(f1, f2);
+                    if (returnVal != 0) {
+                        return returnVal;
+                    }
+                }
+            }
+
+            final ContentClaim claim1 = f1.getContentClaim();
+            final ContentClaim claim2 = f2.getContentClaim();
+
+            // put the one without a claim first
+            if (claim1 == null && claim2 != null) {
+                return -1;
+            } else if (claim1 != null && claim2 == null) {
+                return 1;
+            } else if (claim1 != null && claim2 != null) {
+                final int claimComparison = claim1.compareTo(claim2);
+                if (claimComparison != 0) {
+                    return claimComparison;
+                }
+
+                final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
+                if (claimOffsetComparison != 0) {
+                    return claimOffsetComparison;
+                }
+            }
+
+            return Long.compare(f1.getId(), f2.getId());
+        }
+    }
+
+    @Override
+    public String getFlowFileExpiration() {
+        return flowFileExpirationPeriod.get();
+    }
+
+    @Override
+    public int getFlowFileExpiration(final TimeUnit timeUnit) {
+        return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setFlowFileExpiration(final String flowExpirationPeriod) {
+        final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
+        if (millis < 0) {
+            throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
+        }
+        this.flowFileExpirationPeriod.set(flowExpirationPeriod);
+        this.flowFileExpirationMillis.set(millis);
+    }
+
+
+    @Override
+    public void purgeSwapFiles() {
+        swapManager.purge();
+    }
+
+    @Override
+    public Long recoverSwappedFlowFiles() {
+        int swapFlowFileCount = 0;
+        long swapByteCount = 0L;
+        Long maxId = null;
+
+        writeLock.lock();
+        try {
+            final List<String> swapLocations;
+            try {
+                swapLocations = swapManager.recoverSwapLocations(this);
+            } catch (final IOException ioe) {
+                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
+                        getIdentifier() + "; see logs for more detials");
+                }
+                return null;
+            }
+
+            for (final String swapLocation : swapLocations) {
+                try {
+                    final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
+                    final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
+                    if (maxSwapRecordId != null) {
+                        if (maxId == null || maxSwapRecordId > maxId) {
+                            maxId = maxSwapRecordId;
+                        }
+                    }
+
+                    swapFlowFileCount += queueSize.getObjectCount();
+                    swapByteCount += queueSize.getByteCount();
+                } catch (final IOException ioe) {
+                    logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
+                    logger.error("", ioe);
+                    if (eventReporter != null) {
+                        eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
+                            "; the file appears to be corrupt. See logs for more details");
+                    }
+                }
+            }
+
+            incrementSwapQueueSize(swapFlowFileCount, swapByteCount);
+            this.swapLocations.addAll(swapLocations);
+        } finally {
+            writeLock.unlock("Recover Swap Files");
+        }
+
+        return maxId;
+    }
+
+
+    @Override
+    public String toString() {
+        return "FlowFileQueue[id=" + identifier + "]";
+    }
+
+    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
+
+    @Override
+    public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
+        logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
+
+        // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
+        if (dropRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
+                final DropFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
+
+            for (final String requestId : toDrop) {
+                dropRequestMap.remove(requestId);
+            }
+        }
+
+        final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                writeLock.lock();
+                try {
+                    dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
+                    logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, getQueueSize());
+                    dropRequest.setOriginalSize(getQueueSize());
+
+                    try {
+                        final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
+
+                        QueueSize droppedSize;
+                        try {
+                            droppedSize = drop(activeQueueRecords, requestor);
+                            logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
+                        activeQueue.clear();
+                        incrementActiveQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
+                        dropRequest.setCurrentSize(getQueueSize());
+                        dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+
+                        try {
+                            final QueueSize swapSize = size.get().swapQueueSize();
+
+                            logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
+                                requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
+                            droppedSize = drop(swapQueue, requestor);
+                        } catch (final IOException ioe) {
+                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                            logger.error("", ioe);
+
+                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
+                            return;
+                        }
+
+                        swapQueue.clear();
+                        dropRequest.setCurrentSize(getQueueSize());
+                        dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+                        swapMode = false;
+                        incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
+                        logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize);
+
+                        final int swapFileCount = swapLocations.size();
+                        final Iterator<String> swapLocationItr = swapLocations.iterator();
+                        while (swapLocationItr.hasNext()) {
+                            final String swapLocation = swapLocationItr.next();
+
+                            List<FlowFileRecord> swappedIn = null;
+                            try {
+                                swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+                                droppedSize = drop(swappedIn, requestor);
+                            } catch (final IOException ioe) {
+                                logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
+                                    swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
+                                logger.error("", ioe);
+
+                                dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
+                                if (swappedIn != null) {
+                                    activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
+                                }
+
+                                return;
+                            }
+
+                            dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
+                            incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
+
+                            dropRequest.setCurrentSize(getQueueSize());
+                            swapLocationItr.remove();
+                            logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation);
+                        }
+
+                        logger.debug("Dropped FlowFiles from {} Swap Files", swapFileCount);
+                        logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
+                            dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor);
+                        dropRequest.setState(DropFlowFileState.COMPLETE);
+                    } catch (final Exception e) {
+                        logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString());
+                        logger.error("", e);
+                        dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
+                    }
+                } finally {
+                    writeLock.unlock("Drop FlowFiles");
+                }
+            }
+        }, "Drop FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
+
+        dropRequest.setExecutionThread(t);
+        dropRequestMap.put(requestIdentifier, dropRequest);
+
+        return dropRequest;
+    }
+
+    private QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
+        // Create a Provenance Event and a FlowFile Repository record for each FlowFile
+        final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
+        final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
+        for (final FlowFileRecord flowFile : flowFiles) {
+            provenanceEvents.add(createDropEvent(flowFile, requestor));
+            flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
+        }
+
+        long dropContentSize = 0L;
+        for (final FlowFileRecord flowFile : flowFiles) {
+            dropContentSize += flowFile.getSize();
+            final ContentClaim contentClaim = flowFile.getContentClaim();
+            if (contentClaim == null) {
+                continue;
+            }
+
+            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+            if (resourceClaim == null) {
+                continue;
+            }
+
+            resourceClaimManager.decrementClaimantCount(resourceClaim);
+        }
+
+        provRepository.registerEvents(provenanceEvents);
+        flowFileRepository.updateRepository(flowFileRepoRecords);
+        return new QueueSize(flowFiles.size(), dropContentSize);
+    }
+
+    private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) {
+        final ProvenanceEventBuilder builder = provRepository.eventBuilder();
+        builder.fromFlowFile(flowFile);
+        builder.setEventType(ProvenanceEventType.DROP);
+        builder.setLineageStartDate(flowFile.getLineageStartDate());
+        builder.setComponentId(getIdentifier());
+        builder.setComponentType("Connection");
+        builder.setDetails("FlowFile manually dropped; request made by " + requestor);
+        return builder.build();
+    }
+
+    private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
+        return new RepositoryRecord() {
+            @Override
+            public FlowFileQueue getDestination() {
+                return null;
+            }
+
+            @Override
+            public FlowFileQueue getOriginalQueue() {
+                return StandardFlowFileQueue.this;
+            }
+
+            @Override
+            public RepositoryRecordType getType() {
+                return RepositoryRecordType.DELETE;
+            }
+
+            @Override
+            public ContentClaim getCurrentClaim() {
+                return flowFile.getContentClaim();
+            }
+
+            @Override
+            public ContentClaim getOriginalClaim() {
+                return flowFile.getContentClaim();
+            }
+
+            @Override
+            public long getCurrentClaimOffset() {
+                return flowFile.getContentClaimOffset();
+            }
+
+            @Override
+            public FlowFileRecord getCurrent() {
+                return flowFile;
+            }
+
+            @Override
+            public boolean isAttributesChanged() {
+                return false;
+            }
+
+            @Override
+            public boolean isMarkedForAbort() {
+                return false;
+            }
+
+            @Override
+            public String getSwapLocation() {
+                return null;
+            }
+        };
+    }
+
+
+    @Override
+    public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
+        final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
+        if (request == null) {
+            return false;
+        }
+
+        final boolean successful = request.cancel();
+        return successful;
+    }
+
+    @Override
+    public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
+        return dropRequestMap.get(requestIdentifier);
+    }
+
+    /**
+     * Lock the queue so that other threads are unable to interact with the
+     * queue
+     */
+    public void lock() {
+        writeLock.lock();
+    }
+
+    /**
+     * Unlock the queue
+     */
+    public void unlock() {
+        writeLock.unlock("external unlock");
+    }
+
+    @Override
+    public QueueSize getUnacknowledgedQueueSize() {
+        return size.get().unacknowledgedQueueSize();
+    }
+
+
+    private void incrementActiveQueueSize(final int count, final long bytes) {
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize original = size.get();
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes,
+                original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes);
+            updated = size.compareAndSet(original, newSize);
+        }
+    }
+
+    private void incrementSwapQueueSize(final int count, final long bytes) {
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize original = size.get();
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
+                original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes);
+            updated = size.compareAndSet(original, newSize);
+        }
+    }
+
+    private void incrementUnacknowledgedQueueSize(final int count, final long bytes) {
+        boolean updated = false;
+        while (!updated) {
+            final FlowFileQueueSize original = size.get();
+            final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
+                original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
+            updated = size.compareAndSet(original, newSize);
+        }
+    }
+
+
+    private static class FlowFileQueueSize {
+        private final int activeQueueCount;
+        private final long activeQueueBytes;
+        private final int swappedCount;
+        private final long swappedBytes;
+        private final int unacknowledgedCount;
+        private final long unacknowledgedBytes;
+
+        public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes,
+            final int unacknowledgedCount, final long unacknowledgedBytes) {
+            this.activeQueueCount = activeQueueCount;
+            this.activeQueueBytes = activeQueueBytes;
+            this.swappedCount = swappedCount;
+            this.swappedBytes = swappedBytes;
+            this.unacknowledgedCount = unacknowledgedCount;
+            this.unacknowledgedBytes = unacknowledgedBytes;
+        }
+
+        public boolean isEmpty() {
+            return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0;
+        }
+
+        public QueueSize toQueueSize() {
+            return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes);
+        }
+
+        public QueueSize activeQueueSize() {
+            return new QueueSize(activeQueueCount, activeQueueBytes);
+        }
+
+        public QueueSize unacknowledgedQueueSize() {
+            return new QueueSize(unacknowledgedCount, unacknowledgedBytes);
+        }
+
+        public QueueSize swapQueueSize() {
+            return new QueueSize(swappedCount, swappedBytes);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
new file mode 100644
index 0000000..3789ea5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardFlowFileQueue {
+    private TestSwapManager swapManager = null;
+    private StandardFlowFileQueue queue = null;
+
+    @Before
+    public void setup() {
+        final Connection connection = Mockito.mock(Connection.class);
+        Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
+        Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        swapManager = new TestSwapManager();
+
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
+        final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
+
+        Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
+
+        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
+        TestFlowFile.idGenerator.set(0L);
+    }
+
+
+    @Test
+    public void testSwapOutOccurs() {
+        for (int i = 0; i < 10000; i++) {
+            queue.put(new TestFlowFile());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 1, queue.size().getObjectCount());
+            assertEquals(i + 1, queue.size().getByteCount());
+        }
+
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new TestFlowFile());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 10001, queue.size().getObjectCount());
+            assertEquals(i + 10001, queue.size().getByteCount());
+        }
+
+        queue.put(new TestFlowFile(1000));
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(20999, queue.size().getByteCount());
+
+        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testLowestPrioritySwappedOutFirst() {
+        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+        prioritizers.add(new FlowFileSizePrioritizer());
+        queue.setPriorities(prioritizers);
+
+        long maxSize = 20000;
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile(maxSize - i));
+        }
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+
+        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
+        assertEquals(10000, flowFiles.size());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, flowFiles.get(i).getSize());
+        }
+    }
+
+    @Test
+    public void testSwapIn() {
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        assertEquals(1, swapManager.swappedOut.size());
+        queue.put(new TestFlowFile());
+        assertEquals(1, swapManager.swappedOut.size());
+
+        final Set<FlowFileRecord> exp = new HashSet<>();
+        for (int i = 0; i < 9999; i++) {
+            assertNotNull(queue.poll(exp));
+        }
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+        assertNotNull(queue.poll(exp));
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+
+        assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
+        assertEquals(1, swapManager.swapInCalledCount);
+        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+
+        queue.poll(exp);
+    }
+
+    @Test
+    public void testDropSwappedFlowFiles() {
+        for (int i = 1; i <= 210000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        assertEquals(20, swapManager.swappedOut.size());
+        final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test");
+        while (status.getState() != DropFlowFileState.COMPLETE) {
+            final QueueSize queueSize = queue.size();
+            System.out.println(queueSize);
+            try {
+                Thread.sleep(1000L);
+            } catch (final Exception e) {
+            }
+        }
+
+        System.out.println(queue.size());
+        assertEquals(0, queue.size().getObjectCount());
+        assertEquals(0, queue.size().getByteCount());
+        assertEquals(0, swapManager.swappedOut.size());
+        assertEquals(20, swapManager.swapInCalledCount);
+    }
+
+    private class TestSwapManager implements FlowFileSwapManager {
+        private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
+        int swapOutCalledCount = 0;
+        int swapInCalledCount = 0;
+
+        @Override
+        public void initialize(final SwapManagerInitializationContext initializationContext) {
+
+        }
+
+        @Override
+        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
+            swapOutCalledCount++;
+            final String location = UUID.randomUUID().toString();
+            swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
+            return location;
+        }
+
+        @Override
+        public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
+            return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
+        }
+
+        @Override
+        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+            swapInCalledCount++;
+            return swappedOut.remove(swapLocation);
+        }
+
+        @Override
+        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
+            return new ArrayList<String>(swappedOut.keySet());
+        }
+
+        @Override
+        public QueueSize getSwapSize(String swapLocation) throws IOException {
+            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+            if (flowFiles == null) {
+                return new QueueSize(0, 0L);
+            }
+
+            int count = 0;
+            long size = 0L;
+            for (final FlowFileRecord flowFile : flowFiles) {
+                count++;
+                size += flowFile.getSize();
+            }
+
+            return new QueueSize(count, size);
+        }
+
+        @Override
+        public Long getMaxRecordId(String swapLocation) throws IOException {
+            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+            if (flowFiles == null) {
+                return null;
+            }
+
+            Long max = null;
+            for (final FlowFileRecord flowFile : flowFiles) {
+                if (max == null || flowFile.getId() > max) {
+                    max = flowFile.getId();
+                }
+            }
+
+            return max;
+        }
+
+        @Override
+        public void purge() {
+            swappedOut.clear();
+        }
+    }
+
+
+    private static class TestFlowFile implements FlowFileRecord {
+        private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+        private final long id = idGenerator.getAndIncrement();
+        private final long entryDate = System.currentTimeMillis();
+        private final Map<String, String> attributes;
+        private final long size;
+
+        public TestFlowFile() {
+            this(1L);
+        }
+
+        public TestFlowFile(final long size) {
+            this(new HashMap<String, String>(), size);
+        }
+
+        public TestFlowFile(final Map<String, String> attributes, final long size) {
+            this.attributes = attributes;
+            this.size = size;
+
+            if (!attributes.containsKey(CoreAttributes.UUID.key())) {
+                attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+            }
+        }
+
+
+        @Override
+        public long getId() {
+            return id;
+        }
+
+        @Override
+        public long getEntryDate() {
+            return entryDate;
+        }
+
+        @Override
+        public long getLineageStartDate() {
+            return entryDate;
+        }
+
+        @Override
+        public Long getLastQueueDate() {
+            return null;
+        }
+
+        @Override
+        public Set<String> getLineageIdentifiers() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public boolean isPenalized() {
+            return false;
+        }
+
+        @Override
+        public String getAttribute(String key) {
+            return attributes.get(key);
+        }
+
+        @Override
+        public long getSize() {
+            return size;
+        }
+
+        @Override
+        public Map<String, String> getAttributes() {
+            return Collections.unmodifiableMap(attributes);
+        }
+
+        @Override
+        public int compareTo(final FlowFile o) {
+            return Long.compare(id, o.getId());
+        }
+
+        @Override
+        public long getPenaltyExpirationMillis() {
+            return 0;
+        }
+
+        @Override
+        public ContentClaim getContentClaim() {
+            return null;
+        }
+
+        @Override
+        public long getContentClaimOffset() {
+            return 0;
+        }
+    }
+
+    private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
+        @Override
+        public int compare(final FlowFile o1, final FlowFile o2) {
+            return Long.compare(o1.getSize(), o2.getSize());
+        }
+    }
+}


[3/3] nifi git commit: NIFI-730: bug fixes and code cleanup for swap manager and flowfile queue

Posted by ma...@apache.org.
NIFI-730: bug fixes and code cleanup for swap manager and flowfile queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/77f7d752
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/77f7d752
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/77f7d752

Branch: refs/heads/NIFI-730
Commit: 77f7d7524cb8b07ed2976088f0e57d99233c8327
Parents: afb76af
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 14 09:14:15 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 14 09:14:15 2015 -0400

----------------------------------------------------------------------
 .../repository/FlowFileSwapManager.java         |   10 -
 .../nifi/controller/DropFlowFileRequest.java    |    2 +-
 .../nifi/controller/StandardFlowFileQueue.java  | 1153 ----------------
 .../controller/TestStandardFlowFileQueue.java   |  330 -----
 .../nifi/controller/FileSystemSwapManager.java  |    5 -
 .../nifi/controller/StandardFlowFileQueue.java  | 1246 ++++++++++++++++++
 .../controller/TestStandardFlowFileQueue.java   |  356 +++++
 7 files changed, 1603 insertions(+), 1499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index a70d287..3e341f8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -41,16 +41,6 @@ public interface FlowFileSwapManager {
     void initialize(SwapManagerInitializationContext initializationContext);
 
     /**
-     * Drops all FlowFiles that are swapped out at the given location. This will update the Provenance
-     * Repository as well as the FlowFile Repository and
-     *
-     * @param swapLocation the location of the swap file to drop
-     * @param flowFileQueue the queue to which the FlowFiles belong
-     * @param user the user that initiated the request
-     */
-    void dropSwappedFlowFiles(String swapLocation, FlowFileQueue flowFileQueue, String user) throws IOException;
-
-    /**
      * Swaps out the given FlowFiles that belong to the queue with the given identifier.
      *
      * @param flowFiles the FlowFiles to swap out to external storage

http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
index 189fe7d..58695c2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -64,7 +64,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
     }
 
     void setCurrentSize(final QueueSize queueSize) {
-        this.currentSize = currentSize;
+        this.currentSize = queueSize;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
deleted file mode 100644
index 5b137f7..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ /dev/null
@@ -1,1153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.queue.DropFlowFileState;
-import org.apache.nifi.controller.queue.DropFlowFileStatus;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.RepositoryRecord;
-import org.apache.nifi.controller.repository.RepositoryRecordType;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
-import org.apache.nifi.provenance.ProvenanceEventBuilder;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.concurrency.TimedLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A FlowFileQueue is used to queue FlowFile objects that are awaiting further
- * processing. Must be thread safe.
- *
- */
-public final class StandardFlowFileQueue implements FlowFileQueue {
-
-    public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
-    public static final int SWAP_RECORD_POLL_SIZE = 10000;
-
-    private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
-
-    private PriorityQueue<FlowFileRecord> activeQueue = null;
-    private long activeQueueContentSize = 0L;
-    private ArrayList<FlowFileRecord> swapQueue = null;
-
-    private int swappedRecordCount = 0;
-    private long swappedContentSize = 0L;
-    private String maximumQueueDataSize;
-    private long maximumQueueByteCount;
-    private boolean swapMode = false;
-    private long maximumQueueObjectCount;
-
-    private final EventReporter eventReporter;
-    private final AtomicLong flowFileExpirationMillis;
-    private final Connection connection;
-    private final AtomicReference<String> flowFileExpirationPeriod;
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-    private final List<FlowFilePrioritizer> priorities;
-    private final int swapThreshold;
-    private final FlowFileSwapManager swapManager;
-    private final List<String> swapLocations = new ArrayList<>();
-    private final TimedLock readLock;
-    private final TimedLock writeLock;
-    private final String identifier;
-    private final FlowFileRepository flowFileRepository;
-    private final ProvenanceEventRepository provRepository;
-    private final ResourceClaimManager resourceClaimManager;
-
-    private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
-    private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
-    private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
-
-    // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
-    private final ProcessScheduler scheduler;
-
-    public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
-        final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
-        activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
-        priorities = new ArrayList<>();
-        maximumQueueObjectCount = 0L;
-        maximumQueueDataSize = "0 MB";
-        maximumQueueByteCount = 0L;
-        flowFileExpirationMillis = new AtomicLong(0);
-        flowFileExpirationPeriod = new AtomicReference<>("0 mins");
-        swapQueue = new ArrayList<>();
-        this.eventReporter = eventReporter;
-        this.swapManager = swapManager;
-        this.flowFileRepository = flowFileRepo;
-        this.provRepository = provRepo;
-        this.resourceClaimManager = resourceClaimManager;
-
-        this.identifier = identifier;
-        this.swapThreshold = swapThreshold;
-        this.scheduler = scheduler;
-        this.connection = connection;
-
-        readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
-        writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
-    }
-
-    @Override
-    public String getIdentifier() {
-        return identifier;
-    }
-
-    @Override
-    public List<FlowFilePrioritizer> getPriorities() {
-        return Collections.unmodifiableList(priorities);
-    }
-
-    @Override
-    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
-        writeLock.lock();
-        try {
-            final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new Prioritizer(newPriorities));
-            newQueue.addAll(activeQueue);
-            activeQueue = newQueue;
-            priorities.clear();
-            priorities.addAll(newPriorities);
-        } finally {
-            writeLock.unlock("setPriorities");
-        }
-    }
-
-    @Override
-    public void setBackPressureObjectThreshold(final long maxQueueSize) {
-        writeLock.lock();
-        try {
-            maximumQueueObjectCount = maxQueueSize;
-            this.queueFullRef.set(determineIfFull());
-        } finally {
-            writeLock.unlock("setBackPressureObjectThreshold");
-        }
-    }
-
-    @Override
-    public long getBackPressureObjectThreshold() {
-        readLock.lock();
-        try {
-            return maximumQueueObjectCount;
-        } finally {
-            readLock.unlock("getBackPressureObjectThreshold");
-        }
-    }
-
-    @Override
-    public void setBackPressureDataSizeThreshold(final String maxDataSize) {
-        writeLock.lock();
-        try {
-            maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
-            maximumQueueDataSize = maxDataSize;
-            this.queueFullRef.set(determineIfFull());
-        } finally {
-            writeLock.unlock("setBackPressureDataSizeThreshold");
-        }
-    }
-
-    @Override
-    public String getBackPressureDataSizeThreshold() {
-        readLock.lock();
-        try {
-            return maximumQueueDataSize;
-        } finally {
-            readLock.unlock("getBackPressureDataSizeThreshold");
-        }
-    }
-
-    @Override
-    public QueueSize size() {
-        readLock.lock();
-        try {
-            return getQueueSize();
-        } finally {
-            readLock.unlock("getSize");
-        }
-    }
-
-    /**
-     * MUST be called with lock held
-     *
-     * @return size of queue
-     */
-    private QueueSize getQueueSize() {
-        final QueueSize unacknowledged = unacknowledgedSizeRef.get();
-
-        return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount(),
-            activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount());
-    }
-
-    @Override
-    public boolean isEmpty() {
-        readLock.lock();
-        try {
-            return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
-        } finally {
-            readLock.unlock("isEmpty");
-        }
-    }
-
-    @Override
-    public boolean isActiveQueueEmpty() {
-        final int activeQueueSize = activeQueueSizeRef.get();
-        return activeQueueSize == 0;
-    }
-
-    public QueueSize getActiveQueueSize() {
-        readLock.lock();
-        try {
-            return new QueueSize(activeQueue.size(), activeQueueContentSize);
-        } finally {
-            readLock.unlock("getActiveQueueSize");
-        }
-    }
-
-    @Override
-    public void acknowledge(final FlowFileRecord flowFile) {
-        if (queueFullRef.get()) {
-            writeLock.lock();
-            try {
-                updateUnacknowledgedSize(-1, -flowFile.getSize());
-                queueFullRef.set(determineIfFull());
-            } finally {
-                writeLock.unlock("acknowledge(FlowFileRecord)");
-            }
-        } else {
-            updateUnacknowledgedSize(-1, -flowFile.getSize());
-        }
-
-        if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
-            // queue was full but no longer is. Notify that the source may now be available to run,
-            // because of back pressure caused by this queue.
-            scheduler.registerEvent(connection.getSource());
-        }
-    }
-
-    @Override
-    public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
-        long totalSize = 0L;
-        for (final FlowFileRecord flowFile : flowFiles) {
-            totalSize += flowFile.getSize();
-        }
-
-        if (queueFullRef.get()) {
-            writeLock.lock();
-            try {
-                updateUnacknowledgedSize(-flowFiles.size(), -totalSize);
-                queueFullRef.set(determineIfFull());
-            } finally {
-                writeLock.unlock("acknowledge(FlowFileRecord)");
-            }
-        } else {
-            updateUnacknowledgedSize(-flowFiles.size(), -totalSize);
-        }
-
-        if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
-            // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
-            // because of back pressure caused by this queue.
-            scheduler.registerEvent(connection.getSource());
-        }
-    }
-
-    @Override
-    public boolean isFull() {
-        return queueFullRef.get();
-    }
-
-    /**
-     * MUST be called with either the read or write lock held
-     *
-     * @return true if full
-     */
-    private boolean determineIfFull() {
-        final long maxSize = maximumQueueObjectCount;
-        final long maxBytes = maximumQueueByteCount;
-        if (maxSize <= 0 && maxBytes <= 0) {
-            return false;
-        }
-
-        final QueueSize queueSize = getQueueSize();
-        if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) {
-            return true;
-        }
-
-        if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) {
-            return true;
-        }
-
-        return false;
-    }
-
-    @Override
-    public void put(final FlowFileRecord file) {
-        writeLock.lock();
-        try {
-            if (swapMode || activeQueue.size() >= swapThreshold) {
-                swapQueue.add(file);
-                swappedContentSize += file.getSize();
-                swappedRecordCount++;
-                swapMode = true;
-                writeSwapFilesIfNecessary();
-            } else {
-                activeQueueContentSize += file.getSize();
-                activeQueue.add(file);
-            }
-
-            queueFullRef.set(determineIfFull());
-        } finally {
-            activeQueueSizeRef.set(activeQueue.size());
-            writeLock.unlock("put(FlowFileRecord)");
-        }
-
-        if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
-            scheduler.registerEvent(connection.getDestination());
-        }
-    }
-
-    @Override
-    public void putAll(final Collection<FlowFileRecord> files) {
-        final int numFiles = files.size();
-        long bytes = 0L;
-        for (final FlowFile flowFile : files) {
-            bytes += flowFile.getSize();
-        }
-
-        writeLock.lock();
-        try {
-            if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
-                swapQueue.addAll(files);
-                swappedContentSize += bytes;
-                swappedRecordCount += numFiles;
-                swapMode = true;
-                writeSwapFilesIfNecessary();
-            } else {
-                activeQueueContentSize += bytes;
-                activeQueue.addAll(files);
-            }
-
-            queueFullRef.set(determineIfFull());
-        } finally {
-            activeQueueSizeRef.set(activeQueue.size());
-            writeLock.unlock("putAll");
-        }
-
-        if (connection.getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
-            scheduler.registerEvent(connection.getDestination());
-        }
-    }
-
-
-    private boolean isLaterThan(final Long maxAge) {
-        if (maxAge == null) {
-            return false;
-        }
-        return maxAge < System.currentTimeMillis();
-    }
-
-    private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) {
-        if (flowFile == null) {
-            return null;
-        }
-        if (expirationMillis <= 0) {
-            return null;
-        } else {
-            final long entryDate = flowFile.getEntryDate();
-            final long expirationDate = entryDate + expirationMillis;
-            return expirationDate;
-        }
-    }
-
-    @Override
-    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
-        FlowFileRecord flowFile = null;
-
-        // First check if we have any records Pre-Fetched.
-        final long expirationMillis = flowFileExpirationMillis.get();
-        writeLock.lock();
-        try {
-            flowFile = doPoll(expiredRecords, expirationMillis);
-            return flowFile;
-        } finally {
-            activeQueueSizeRef.set(activeQueue.size());
-            writeLock.unlock("poll(Set)");
-
-            if (flowFile != null) {
-                updateUnacknowledgedSize(1, flowFile.getSize());
-            }
-        }
-    }
-
-    private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
-        FlowFileRecord flowFile;
-        boolean isExpired;
-
-        migrateSwapToActive();
-        final boolean queueFullAtStart = queueFullRef.get();
-
-        do {
-            flowFile = this.activeQueue.poll();
-
-            isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
-            if (isExpired) {
-                expiredRecords.add(flowFile);
-                if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
-                    activeQueueContentSize -= flowFile.getSize();
-                    break;
-                }
-            } else if (flowFile != null && flowFile.isPenalized()) {
-                this.activeQueue.add(flowFile);
-                flowFile = null;
-                break;
-            }
-
-            if (flowFile != null) {
-                activeQueueContentSize -= flowFile.getSize();
-            }
-        } while (isExpired);
-
-        // if at least 1 FlowFile was expired & the queue was full before we started, then
-        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-        // then the queue will still be full until the appropriate #acknowledge method is called.
-        if (queueFullAtStart && !expiredRecords.isEmpty()) {
-            queueFullRef.set(determineIfFull());
-        }
-
-        return isExpired ? null : flowFile;
-    }
-
-    @Override
-    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
-        final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
-
-        // First check if we have any records Pre-Fetched.
-        writeLock.lock();
-        try {
-            doPoll(records, maxResults, expiredRecords);
-        } finally {
-            activeQueueSizeRef.set(activeQueue.size());
-            writeLock.unlock("poll(int, Set)");
-        }
-        return records;
-    }
-
-    private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
-        migrateSwapToActive();
-
-        final boolean queueFullAtStart = queueFullRef.get();
-
-        final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
-
-        long expiredBytes = 0L;
-        for (final FlowFileRecord record : expiredRecords) {
-            expiredBytes += record.getSize();
-        }
-
-        activeQueueContentSize -= bytesDrained;
-        updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes);
-
-        // if at least 1 FlowFile was expired & the queue was full before we started, then
-        // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-        // then the queue will still be full until the appropriate #acknowledge method is called.
-        if (queueFullAtStart && !expiredRecords.isEmpty()) {
-            queueFullRef.set(determineIfFull());
-        }
-    }
-
-    /**
-     * If there are FlowFiles waiting on the swap queue, move them to the active
-     * queue until we meet our threshold. This prevents us from having to swap
-     * them to disk & then back out.
-     *
-     * This method MUST be called with the writeLock held.
-     */
-    private void migrateSwapToActive() {
-        // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't
-        // have to swap them out & then swap them back in.
-        // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles
-        // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out.
-        // In particular, this can happen if the queue is typically filled with surges.
-        // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave
-        // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in,
-        // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and
-        // swapped back in again.
-        // Calling this method when records are polled prevents this condition by migrating FlowFiles from the
-        // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
-        // to disk, because we want them to be swapped back in in the same order that they were swapped out.
-
-        if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
-            return;
-        }
-
-        // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that
-        // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the
-        // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot
-        // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
-        // first.
-        if (!swapLocations.isEmpty()) {
-            final String swapLocation = swapLocations.remove(0);
-            try {
-                final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this);
-                swappedRecordCount -= swappedIn.size();
-                long swapSize = 0L;
-                for (final FlowFileRecord flowFile : swappedIn) {
-                    swapSize += flowFile.getSize();
-                }
-                swappedContentSize -= swapSize;
-                activeQueueContentSize += swapSize;
-                activeQueueSizeRef.set(activeQueue.size());
-                activeQueue.addAll(swappedIn);
-                return;
-            } catch (final FileNotFoundException fnfe) {
-                logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
-                if (eventReporter != null) {
-                    eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
-                }
-                return;
-            } catch (final IOException ioe) {
-                logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
-                logger.error("", ioe);
-                if (eventReporter != null) {
-                    eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
-                        swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
-                }
-                return;
-            }
-        }
-
-        // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
-        // of other checks for 99.999% of the cases.
-        if (swappedRecordCount == 0 && swapQueue.isEmpty()) {
-            return;
-        }
-
-        if (swappedRecordCount > swapQueue.size()) {
-            // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
-            // an external process to swap FlowFiles back in.
-            return;
-        }
-
-        final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
-        while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
-            final FlowFileRecord toMigrate = swapItr.next();
-            activeQueue.add(toMigrate);
-            activeQueueContentSize += toMigrate.getSize();
-            swappedContentSize -= toMigrate.getSize();
-            swappedRecordCount--;
-
-            swapItr.remove();
-        }
-
-        if (swappedRecordCount == 0) {
-            swapMode = false;
-        }
-    }
-
-    /**
-     * This method MUST be called with the write lock held
-     */
-    private void writeSwapFilesIfNecessary() {
-        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
-            return;
-        }
-
-        final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
-
-        // Create a new Priority queue with the prioritizers that are set, but reverse the
-        // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
-        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities)));
-        tempQueue.addAll(activeQueue);
-        tempQueue.addAll(swapQueue);
-
-        final List<String> swapLocations = new ArrayList<>(numSwapFiles);
-        for (int i = 0; i < numSwapFiles; i++) {
-            // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
-            final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
-            for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
-                toSwap.add(tempQueue.poll());
-            }
-
-            try {
-                Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
-                final String swapLocation = swapManager.swapOut(toSwap, this);
-                swapLocations.add(swapLocation);
-            } catch (final IOException ioe) {
-                tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
-                logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting "
-                    + "the Java heap space but failed to write information to disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), ioe.toString());
-                logger.error("", ioe);
-                if (eventReporter != null) {
-                    eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has " + getQueueSize().getObjectCount() +
-                        " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. "
-                        + "See logs for more information.");
-                }
-
-                break;
-            }
-        }
-
-        // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
-        // swap queue. Then add the records back to the active queue.
-        swapQueue.clear();
-        while (tempQueue.size() > swapThreshold) {
-            final FlowFileRecord record = tempQueue.poll();
-            swapQueue.add(record);
-        }
-
-        Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
-
-        // replace the contents of the active queue, since we've merged it with the swap queue.
-        activeQueue.clear();
-        FlowFileRecord toRequeue;
-        while ((toRequeue = tempQueue.poll()) != null) {
-            activeQueue.offer(toRequeue);
-        }
-        this.swapLocations.addAll(swapLocations);
-    }
-
-
-    @Override
-    public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
-        long drainedSize = 0L;
-        FlowFileRecord pulled = null;
-
-        final long expirationMillis = this.flowFileExpirationMillis.get();
-        while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
-            if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
-                expiredRecords.add(pulled);
-                if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
-                    break;
-                }
-            } else {
-                if (pulled.isPenalized()) {
-                    sourceQueue.add(pulled);
-                    break;
-                }
-                destination.add(pulled);
-            }
-            drainedSize += pulled.getSize();
-        }
-        return drainedSize;
-    }
-
-    @Override
-    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
-        writeLock.lock();
-        try {
-            migrateSwapToActive();
-
-            final long expirationMillis = this.flowFileExpirationMillis.get();
-            final boolean queueFullAtStart = queueFullRef.get();
-
-            final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
-            final List<FlowFileRecord> unselected = new ArrayList<>();
-
-            while (true) {
-                FlowFileRecord flowFile = this.activeQueue.poll();
-                if (flowFile == null) {
-                    break;
-                }
-
-                final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
-                if (isExpired) {
-                    expiredRecords.add(flowFile);
-                    activeQueueContentSize -= flowFile.getSize();
-
-                    if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
-                        break;
-                    } else {
-                        continue;
-                    }
-                } else if (flowFile.isPenalized()) {
-                    this.activeQueue.add(flowFile);
-                    flowFile = null;
-                    break; // just stop searching because the rest are all penalized.
-                }
-
-                final FlowFileFilterResult result = filter.filter(flowFile);
-                if (result.isAccept()) {
-                    activeQueueContentSize -= flowFile.getSize();
-
-                    updateUnacknowledgedSize(1, flowFile.getSize());
-                    selectedFlowFiles.add(flowFile);
-                } else {
-                    unselected.add(flowFile);
-                }
-
-                if (!result.isContinue()) {
-                    break;
-                }
-            }
-
-            this.activeQueue.addAll(unselected);
-
-            // if at least 1 FlowFile was expired & the queue was full before we started, then
-            // we need to determine whether or not the queue is full again. If no FlowFile was expired,
-            // then the queue will still be full until the appropriate #acknowledge method is called.
-            if (queueFullAtStart && !expiredRecords.isEmpty()) {
-                queueFullRef.set(determineIfFull());
-            }
-
-            return selectedFlowFiles;
-        } finally {
-            activeQueueSizeRef.set(activeQueue.size());
-            writeLock.unlock("poll(Filter, Set)");
-        }
-    }
-
-
-
-    private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
-
-        private static final long serialVersionUID = 1L;
-        private final transient List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
-
-        private Prioritizer(final List<FlowFilePrioritizer> priorities) {
-            if (null != priorities) {
-                prioritizers.addAll(priorities);
-            }
-        }
-
-        @Override
-        public int compare(final FlowFileRecord f1, final FlowFileRecord f2) {
-            int returnVal = 0;
-            final boolean f1Penalized = f1.isPenalized();
-            final boolean f2Penalized = f2.isPenalized();
-
-            if (f1Penalized && !f2Penalized) {
-                return 1;
-            } else if (!f1Penalized && f2Penalized) {
-                return -1;
-            }
-
-            if (f1Penalized && f2Penalized) {
-                if (f1.getPenaltyExpirationMillis() < f2.getPenaltyExpirationMillis()) {
-                    return -1;
-                } else if (f1.getPenaltyExpirationMillis() > f2.getPenaltyExpirationMillis()) {
-                    return 1;
-                }
-            }
-
-            if (!prioritizers.isEmpty()) {
-                for (final FlowFilePrioritizer prioritizer : prioritizers) {
-                    returnVal = prioritizer.compare(f1, f2);
-                    if (returnVal != 0) {
-                        return returnVal;
-                    }
-                }
-            }
-
-            final ContentClaim claim1 = f1.getContentClaim();
-            final ContentClaim claim2 = f2.getContentClaim();
-
-            // put the one without a claim first
-            if (claim1 == null && claim2 != null) {
-                return -1;
-            } else if (claim1 != null && claim2 == null) {
-                return 1;
-            } else if (claim1 != null && claim2 != null) {
-                final int claimComparison = claim1.compareTo(claim2);
-                if (claimComparison != 0) {
-                    return claimComparison;
-                }
-
-                final int claimOffsetComparison = Long.compare(f1.getContentClaimOffset(), f2.getContentClaimOffset());
-                if (claimOffsetComparison != 0) {
-                    return claimOffsetComparison;
-                }
-            }
-
-            return Long.compare(f1.getId(), f2.getId());
-        }
-    }
-
-    @Override
-    public String getFlowFileExpiration() {
-        return flowFileExpirationPeriod.get();
-    }
-
-    @Override
-    public int getFlowFileExpiration(final TimeUnit timeUnit) {
-        return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public void setFlowFileExpiration(final String flowExpirationPeriod) {
-        final long millis = FormatUtils.getTimeDuration(flowExpirationPeriod, TimeUnit.MILLISECONDS);
-        if (millis < 0) {
-            throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
-        }
-        this.flowFileExpirationPeriod.set(flowExpirationPeriod);
-        this.flowFileExpirationMillis.set(millis);
-    }
-
-
-    @Override
-    public void purgeSwapFiles() {
-        swapManager.purge();
-    }
-
-    @Override
-    public Long recoverSwappedFlowFiles() {
-        int swapFlowFileCount = 0;
-        long swapByteCount = 0L;
-        Long maxId = null;
-
-        writeLock.lock();
-        try {
-            final List<String> swapLocations;
-            try {
-                swapLocations = swapManager.recoverSwapLocations(this);
-            } catch (final IOException ioe) {
-                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier());
-                logger.error("", ioe);
-                if (eventReporter != null) {
-                    eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
-                        getIdentifier() + "; see logs for more detials");
-                }
-                return null;
-            }
-
-            for (final String swapLocation : swapLocations) {
-                try {
-                    final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
-                    final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
-                    if (maxSwapRecordId != null) {
-                        if (maxId == null || maxSwapRecordId > maxId) {
-                            maxId = maxSwapRecordId;
-                        }
-                    }
-
-                    swapFlowFileCount += queueSize.getObjectCount();
-                    swapByteCount += queueSize.getByteCount();
-                } catch (final IOException ioe) {
-                    logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
-                    logger.error("", ioe);
-                    if (eventReporter != null) {
-                        eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
-                            "; the file appears to be corrupt. See logs for more details");
-                    }
-                }
-            }
-
-            this.swappedRecordCount = swapFlowFileCount;
-            this.swappedContentSize = swapByteCount;
-            this.swapLocations.addAll(swapLocations);
-        } finally {
-            writeLock.unlock("Recover Swap Files");
-        }
-
-        return maxId;
-    }
-
-
-    @Override
-    public String toString() {
-        return "FlowFileQueue[id=" + identifier + "]";
-    }
-
-    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
-
-    @Override
-    public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
-        // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
-        if (dropRequestMap.size() > 10) {
-            final List<String> toDrop = new ArrayList<>();
-            for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
-                final DropFlowFileRequest request = entry.getValue();
-                final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
-
-                if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
-                    toDrop.add(entry.getKey());
-                }
-            }
-
-            for (final String requestId : toDrop) {
-                dropRequestMap.remove(requestId);
-            }
-        }
-
-        final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
-        final Thread t = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                writeLock.lock();
-                try {
-                    dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
-                    dropRequest.setOriginalSize(getQueueSize());
-
-                    try {
-                        final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
-
-                        QueueSize droppedSize;
-                        try {
-                            droppedSize = drop(activeQueueRecords, requestor);
-                        } catch (final IOException ioe) {
-                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
-                            logger.error("", ioe);
-
-                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
-                            return;
-                        }
-
-                        activeQueue.clear();
-                        activeQueueContentSize = 0;
-                        activeQueueSizeRef.set(0);
-                        dropRequest.setCurrentSize(getQueueSize());
-                        dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
-
-                        try {
-                            droppedSize = drop(swapQueue, requestor);
-                        } catch (final IOException ioe) {
-                            logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
-                            logger.error("", ioe);
-
-                            dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
-                            return;
-                        }
-
-                        swapQueue.clear();
-                        dropRequest.setCurrentSize(getQueueSize());
-                        dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
-                        swapMode = false;
-                        swappedContentSize -= droppedSize.getByteCount();
-                        swappedRecordCount -= droppedSize.getObjectCount();
-
-                        final Iterator<String> swapLocationItr = swapLocations.iterator();
-                        while (swapLocationItr.hasNext()) {
-                            final String swapLocation = swapLocationItr.next();
-
-                            List<FlowFileRecord> swappedIn = null;
-                            try {
-                                swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
-                                droppedSize = drop(swappedIn, requestor);
-                            } catch (final IOException ioe) {
-                                logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
-                                    swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
-                                logger.error("", ioe);
-
-                                dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
-                                if (swappedIn != null) {
-                                    activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
-                                }
-
-                                return;
-                            }
-
-                            dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
-                            swappedContentSize -= droppedSize.getByteCount();
-                            swappedRecordCount -= droppedSize.getObjectCount();
-                            dropRequest.setCurrentSize(getQueueSize());
-                            swapLocationItr.remove();
-                        }
-
-                        dropRequest.setState(DropFlowFileState.COMPLETE);
-                    } catch (final Exception e) {
-                        dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
-                    }
-                } finally {
-                    writeLock.unlock("Drop FlowFiles");
-                }
-            }
-        }, "Drop FlowFiles for Connection " + getIdentifier());
-        t.setDaemon(true);
-        t.start();
-
-        dropRequest.setExecutionThread(t);
-        dropRequestMap.put(requestIdentifier, dropRequest);
-
-        return dropRequest;
-    }
-
-    private QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
-        // Create a Provenance Event and a FlowFile Repository record for each FlowFile
-        final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
-        final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
-        for (final FlowFileRecord flowFile : flowFiles) {
-            provenanceEvents.add(createDropEvent(flowFile, requestor));
-            flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
-        }
-
-        long dropContentSize = 0L;
-        for (final FlowFileRecord flowFile : flowFiles) {
-            final ContentClaim contentClaim = flowFile.getContentClaim();
-            if (contentClaim == null) {
-                continue;
-            }
-
-            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
-            if (resourceClaim == null) {
-                continue;
-            }
-
-            resourceClaimManager.decrementClaimantCount(resourceClaim);
-            dropContentSize += flowFile.getSize();
-        }
-
-        provRepository.registerEvents(provenanceEvents);
-        flowFileRepository.updateRepository(flowFileRepoRecords);
-        return new QueueSize(flowFiles.size(), dropContentSize);
-    }
-
-    private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) {
-        final ProvenanceEventBuilder builder = provRepository.eventBuilder();
-        builder.fromFlowFile(flowFile);
-        builder.setEventType(ProvenanceEventType.DROP);
-        builder.setLineageStartDate(flowFile.getLineageStartDate());
-        builder.setComponentId(getIdentifier());
-        builder.setComponentType("Connection");
-        builder.setDetails("FlowFile manually dropped; request made by " + requestor);
-        return builder.build();
-    }
-
-    private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
-        return new RepositoryRecord() {
-            @Override
-            public FlowFileQueue getDestination() {
-                return null;
-            }
-
-            @Override
-            public FlowFileQueue getOriginalQueue() {
-                return StandardFlowFileQueue.this;
-            }
-
-            @Override
-            public RepositoryRecordType getType() {
-                return RepositoryRecordType.DELETE;
-            }
-
-            @Override
-            public ContentClaim getCurrentClaim() {
-                return flowFile.getContentClaim();
-            }
-
-            @Override
-            public ContentClaim getOriginalClaim() {
-                return flowFile.getContentClaim();
-            }
-
-            @Override
-            public long getCurrentClaimOffset() {
-                return flowFile.getContentClaimOffset();
-            }
-
-            @Override
-            public FlowFileRecord getCurrent() {
-                return flowFile;
-            }
-
-            @Override
-            public boolean isAttributesChanged() {
-                return false;
-            }
-
-            @Override
-            public boolean isMarkedForAbort() {
-                return false;
-            }
-
-            @Override
-            public String getSwapLocation() {
-                return null;
-            }
-        };
-    }
-
-
-    @Override
-    public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
-        final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
-        if (request == null) {
-            return false;
-        }
-
-        final boolean successful = request.cancel();
-        return successful;
-    }
-
-    @Override
-    public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
-        return dropRequestMap.get(requestIdentifier);
-    }
-
-    /**
-     * Lock the queue so that other threads are unable to interact with the
-     * queue
-     */
-    public void lock() {
-        writeLock.lock();
-    }
-
-    /**
-     * Unlock the queue
-     */
-    public void unlock() {
-        writeLock.unlock("external unlock");
-    }
-
-    @Override
-    public QueueSize getUnacknowledgedQueueSize() {
-        return unacknowledgedSizeRef.get();
-    }
-
-    private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
-        boolean updated = false;
-
-        do {
-            final QueueSize queueSize = unacknowledgedSizeRef.get();
-            final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
-            updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
-        } while (!updated);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
deleted file mode 100644
index 66f32d8..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.queue.QueueSize;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestStandardFlowFileQueue {
-    private TestSwapManager swapManager = null;
-    private StandardFlowFileQueue queue = null;
-
-    @Before
-    public void setup() {
-        final Connection connection = Mockito.mock(Connection.class);
-        Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
-        Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
-
-        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
-        swapManager = new TestSwapManager();
-
-        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
-        final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
-        final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
-
-        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
-        TestFlowFile.idGenerator.set(0L);
-    }
-
-
-    @Test
-    public void testSwapOutOccurs() {
-        for (int i = 0; i < 10000; i++) {
-            queue.put(new TestFlowFile());
-            assertEquals(0, swapManager.swapOutCalledCount);
-            assertEquals(i + 1, queue.size().getObjectCount());
-            assertEquals(i + 1, queue.size().getByteCount());
-        }
-
-        for (int i = 0; i < 9999; i++) {
-            queue.put(new TestFlowFile());
-            assertEquals(0, swapManager.swapOutCalledCount);
-            assertEquals(i + 10001, queue.size().getObjectCount());
-            assertEquals(i + 10001, queue.size().getByteCount());
-        }
-
-        queue.put(new TestFlowFile(1000));
-        assertEquals(1, swapManager.swapOutCalledCount);
-        assertEquals(20000, queue.size().getObjectCount());
-        assertEquals(20999, queue.size().getByteCount());
-
-        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
-    }
-
-    @Test
-    public void testLowestPrioritySwappedOutFirst() {
-        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
-        prioritizers.add(new FlowFileSizePrioritizer());
-        queue.setPriorities(prioritizers);
-
-        long maxSize = 20000;
-        for (int i = 1; i <= 20000; i++) {
-            queue.put(new TestFlowFile(maxSize - i));
-        }
-
-        assertEquals(1, swapManager.swapOutCalledCount);
-        assertEquals(20000, queue.size().getObjectCount());
-
-        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
-        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
-        assertEquals(10000, flowFiles.size());
-        for (int i = 0; i < 10000; i++) {
-            assertEquals(i, flowFiles.get(i).getSize());
-        }
-    }
-
-    @Test
-    public void testSwapIn() {
-        for (int i = 1; i <= 20000; i++) {
-            queue.put(new TestFlowFile());
-        }
-
-        assertEquals(1, swapManager.swappedOut.size());
-        queue.put(new TestFlowFile());
-        assertEquals(1, swapManager.swappedOut.size());
-
-        final Set<FlowFileRecord> exp = new HashSet<>();
-        for (int i = 0; i < 9999; i++) {
-            assertNotNull(queue.poll(exp));
-        }
-
-        assertEquals(0, swapManager.swapInCalledCount);
-        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
-        assertNotNull(queue.poll(exp));
-
-        assertEquals(0, swapManager.swapInCalledCount);
-        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
-
-        assertEquals(1, swapManager.swapOutCalledCount);
-
-        assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
-        assertEquals(1, swapManager.swapInCalledCount);
-        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
-
-        assertTrue(swapManager.swappedOut.isEmpty());
-
-        queue.poll(exp);
-
-    }
-
-
-    private class TestSwapManager implements FlowFileSwapManager {
-        private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
-        int swapOutCalledCount = 0;
-        int swapInCalledCount = 0;
-
-
-        @Override
-        public void initialize(final SwapManagerInitializationContext initializationContext) {
-
-        }
-
-        @Override
-        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
-            swapOutCalledCount++;
-            final String location = UUID.randomUUID().toString();
-            swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
-            return location;
-        }
-
-        @Override
-        public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
-            return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
-        }
-
-        @Override
-        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
-            swapInCalledCount++;
-            return swappedOut.remove(swapLocation);
-        }
-
-        @Override
-        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
-            return new ArrayList<String>(swappedOut.keySet());
-        }
-
-        @Override
-        public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) {
-
-        }
-
-        @Override
-        public QueueSize getSwapSize(String swapLocation) throws IOException {
-            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
-            if (flowFiles == null) {
-                return new QueueSize(0, 0L);
-            }
-
-            int count = 0;
-            long size = 0L;
-            for (final FlowFileRecord flowFile : flowFiles) {
-                count++;
-                size += flowFile.getSize();
-            }
-
-            return new QueueSize(count, size);
-        }
-
-        @Override
-        public Long getMaxRecordId(String swapLocation) throws IOException {
-            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
-            if (flowFiles == null) {
-                return null;
-            }
-
-            Long max = null;
-            for (final FlowFileRecord flowFile : flowFiles) {
-                if (max == null || flowFile.getId() > max) {
-                    max = flowFile.getId();
-                }
-            }
-
-            return max;
-        }
-
-        @Override
-        public void purge() {
-            swappedOut.clear();
-        }
-    }
-
-
-    private static class TestFlowFile implements FlowFileRecord {
-        private static final AtomicLong idGenerator = new AtomicLong(0L);
-
-        private final long id = idGenerator.getAndIncrement();
-        private final long entryDate = System.currentTimeMillis();
-        private final Map<String, String> attributes;
-        private final long size;
-
-        public TestFlowFile() {
-            this(1L);
-        }
-
-        public TestFlowFile(final long size) {
-            this(new HashMap<String, String>(), size);
-        }
-
-        public TestFlowFile(final Map<String, String> attributes, final long size) {
-            this.attributes = attributes;
-            this.size = size;
-        }
-
-
-        @Override
-        public long getId() {
-            return id;
-        }
-
-        @Override
-        public long getEntryDate() {
-            return entryDate;
-        }
-
-        @Override
-        public long getLineageStartDate() {
-            return entryDate;
-        }
-
-        @Override
-        public Long getLastQueueDate() {
-            return null;
-        }
-
-        @Override
-        public Set<String> getLineageIdentifiers() {
-            return Collections.emptySet();
-        }
-
-        @Override
-        public boolean isPenalized() {
-            return false;
-        }
-
-        @Override
-        public String getAttribute(String key) {
-            return attributes.get(key);
-        }
-
-        @Override
-        public long getSize() {
-            return size;
-        }
-
-        @Override
-        public Map<String, String> getAttributes() {
-            return Collections.unmodifiableMap(attributes);
-        }
-
-        @Override
-        public int compareTo(final FlowFile o) {
-            return Long.compare(id, o.getId());
-        }
-
-        @Override
-        public long getPenaltyExpirationMillis() {
-            return 0;
-        }
-
-        @Override
-        public ContentClaim getContentClaim() {
-            return null;
-        }
-
-        @Override
-        public long getContentClaimOffset() {
-            return 0;
-        }
-    }
-
-    private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
-        @Override
-        public int compare(final FlowFile o1, final FlowFile o2) {
-            return Long.compare(o1.getSize(), o2.getSize());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/77f7d752/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index c4a86f2..1162f39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -173,11 +173,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    @Override
-    public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException {
-
-    }
-
 
     @Override
     public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {