You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/13 16:03:58 UTC

[1/5] nifi git commit: NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue

Repository: nifi
Updated Branches:
  refs/heads/NIFI-730 e0ac7cde3 -> 9be37914d


http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index c829566..7ab56ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -30,46 +30,29 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
-import org.apache.nifi.controller.repository.ConnectionSwapInfo;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.QueueProvider;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,24 +68,17 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
     private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
 
-    public static final int SWAP_ENCODING_VERSION = 7;
+    public static final int SWAP_ENCODING_VERSION = 8;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
+    private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
 
-    private final ScheduledExecutorService swapQueueIdentifierExecutor;
-    private final ScheduledExecutorService swapInExecutor;
-    private volatile FlowFileRepository flowFileRepository;
-    private volatile EventReporter eventReporter;
-
-    // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
-    private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
     private final File storageDirectory;
-    private final long swapInMillis;
-    private final long swapOutMillis;
-    private final int swapOutThreadCount;
 
-    private ResourceClaimManager claimManager; // effectively final
+    // effectively final
+    private FlowFileRepository flowFileRepository;
+    private EventReporter eventReporter;
+    private ResourceClaimManager claimManager;
 
-    private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
 
     public FileSystemSwapManager() {
         final NiFiProperties properties = NiFiProperties.getInstance();
@@ -112,42 +88,240 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         if (!storageDirectory.exists() && !storageDirectory.mkdirs()) {
             throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath());
         }
+    }
 
-        swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping");
 
-        swapInMillis = FormatUtils.getTimeDuration(properties.getSwapInPeriod(), TimeUnit.MILLISECONDS);
-        swapOutMillis = FormatUtils.getTimeDuration(properties.getSwapOutPeriod(), TimeUnit.MILLISECONDS);
-        swapOutThreadCount = properties.getSwapOutThreads();
-        swapInExecutor = new FlowEngine(properties.getSwapInThreads(), "Swap In FlowFiles");
+    @Override
+    public synchronized void initialize(final SwapManagerInitializationContext initializationContext) {
+        this.claimManager = initializationContext.getResourceClaimManager();
+        this.eventReporter = initializationContext.getEventReporter();
+        this.flowFileRepository = initializationContext.getFlowFileRepository();
     }
 
     @Override
+    public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue) throws IOException {
+        if (toSwap == null || toSwap.isEmpty()) {
+            return null;
+        }
+
+        final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString() + ".swap");
+        final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
+        final String swapLocation = swapFile.getAbsolutePath();
+
+        try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
+            serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+            fos.getFD().sync();
+        }
+
+        if (swapTempFile.renameTo(swapFile)) {
+            flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
+        } else {
+            error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
+        }
+
+        return swapLocation;
+    }
+
+
+    @Override
+    public List<FlowFileRecord> swapIn(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
+        final File swapFile = new File(swapLocation);
+        final List<FlowFileRecord> swappedFlowFiles = peek(swapLocation, flowFileQueue);
+        flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue);
+
+        if (!swapFile.delete()) {
+            warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
+        }
+
+        // TODO: When FlowFile Queue performs this operation, it needs to take the following error handling logic into account:
+
+        /*
+         * } catch (final EOFException eof) {
+         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
+         *
+         * if (!swapFile.delete()) {
+         * warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
+         * }
+         * } catch (final FileNotFoundException fnfe) {
+         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
+         * } catch (final Exception e) {
+         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
+         *
+         * if (swapFile != null) {
+         * queue.add(swapFile);
+         * }
+         * }
+         */
+        return swappedFlowFiles;
+    }
+
+    @Override
+    public List<FlowFileRecord> peek(final String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
+        final File swapFile = new File(swapLocation);
+        if (!swapFile.exists()) {
+            throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found");
+        }
+
+        final List<FlowFileRecord> swappedFlowFiles;
+        try (final InputStream fis = new FileInputStream(swapFile);
+            final DataInputStream in = new DataInputStream(fis)) {
+            swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, swapLocation, claimManager);
+        }
+
+        return swappedFlowFiles;
+    }
+
+
+    @Override
     public void purge() {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches();
+                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
+            }
+        });
+
+        for (final File file : swapFiles) {
+            if (!file.delete()) {
+                warn("Failed to delete Swap File " + file + " when purging FlowFile Swap Manager");
+            }
+        }
+    }
+
+    @Override
+    public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
+        final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String name) {
+                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
             }
         });
 
-        if (swapFiles != null) {
-            for (final File file : swapFiles) {
-                if (!file.delete() && file.exists()) {
-                    logger.warn("Failed to delete SWAP file {}", file);
+        if (swapFiles == null) {
+            return Collections.emptyList();
+        }
+
+        final List<String> swapLocations = new ArrayList<>();
+        // remove in .part files, as they are partial swap files that did not get written fully.
+        for (final File swapFile : swapFiles) {
+            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
+                if (swapFile.delete()) {
+                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
+                } else {
+                    warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
                 }
+
+                continue;
             }
+
+            // split the filename by dashes. The old filenaming scheme was "<timestamp>-<randomuuid>.swap" but the new naming scheme is
+            // "<timestamp>-<queue identifier>-<random uuid>.swap". If we have two dashes, then we can just check if the queue ID is equal
+            // to the id of the queue given and if not we can just move on.
+            final String[] splits = swapFile.getName().split("-");
+            if (splits.length == 3) {
+                final String queueIdentifier = splits[1];
+                if (!queueIdentifier.equals(flowFileQueue.getIdentifier())) {
+                    continue;
+                }
+            }
+
+            // Read the queue identifier from the swap file to check if the swap file is for this queue
+            try (final InputStream fis = new FileInputStream(swapFile);
+                final InputStream bufferedIn = new BufferedInputStream(fis);
+                final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+                final int swapEncodingVersion = in.readInt();
+                if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+                    final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+                        + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                    throw new IOException(errMsg);
+                }
+
+                final String connectionId = in.readUTF();
+                if (connectionId.equals(flowFileQueue.getIdentifier())) {
+                    swapLocations.add(swapFile.getAbsolutePath());
+                }
+            }
+        }
+
+        Collections.sort(swapLocations, new SwapFileComparator());
+        return swapLocations;
+    }
+
+    @Override
+    public QueueSize getSwapSize(final String swapLocation) throws IOException {
+        final File swapFile = new File(swapLocation);
+
+        // read record from disk via the swap file
+        try (final InputStream fis = new FileInputStream(swapFile);
+            final InputStream bufferedIn = new BufferedInputStream(fis);
+            final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+            final int swapEncodingVersion = in.readInt();
+            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+                final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+                    + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                throw new IOException(errMsg);
+            }
+
+            in.readUTF(); // ignore Connection ID
+            final int numRecords = in.readInt();
+            final long contentSize = in.readLong();
+
+            return new QueueSize(numRecords, contentSize);
         }
     }
 
     @Override
-    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ResourceClaimManager claimManager, final EventReporter eventReporter) {
-        this.claimManager = claimManager;
-        this.flowFileRepository = flowFileRepository;
-        this.eventReporter = eventReporter;
-        swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
-        swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
+    public Long getMaxRecordId(final String swapLocation) throws IOException {
+        final File swapFile = new File(swapLocation);
+
+        // read record from disk via the swap file
+        try (final InputStream fis = new FileInputStream(swapFile);
+            final InputStream bufferedIn = new BufferedInputStream(fis);
+            final DataInputStream in = new DataInputStream(bufferedIn)) {
+
+            final int swapEncodingVersion = in.readInt();
+            if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
+                final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+                    + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                throw new IOException(errMsg);
+            }
+
+            in.readUTF(); // ignore connection id
+            final int numRecords = in.readInt();
+            in.readLong(); // ignore content size
+
+            if (numRecords == 0) {
+                return null;
+            }
+
+            if (swapEncodingVersion > 7) {
+                final long maxRecordId = in.readLong();
+                return maxRecordId;
+            }
+
+            // Before swap encoding version 8, we did not write out the max record id, so we have to read all
+            // swap files to determine the max record id
+            final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, swapEncodingVersion, true, claimManager);
+            long maxId = 0L;
+            for (final FlowFileRecord record : records) {
+                if (record.getId() > maxId) {
+                    maxId = record.getId();
+                }
+            }
+
+            return maxId;
+        }
     }
 
+
     public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
         if (toSwap == null || toSwap.isEmpty()) {
             return 0;
@@ -167,6 +341,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             out.writeInt(toSwap.size());
             out.writeLong(contentSize);
 
+            // get the max record id and write that out so that we know it quickly for restoration
+            long maxRecordId = 0L;
+            for (final FlowFileRecord flowFile : toSwap) {
+                if (flowFile.getId() > maxRecordId) {
+                    maxRecordId = flowFile.getId();
+                }
+            }
+
+            out.writeLong(maxRecordId);
+
             for (final FlowFileRecord flowFile : toSwap) {
                 out.writeLong(flowFile.getId());
                 out.writeLong(flowFile.getEntryDate());
@@ -207,7 +391,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             out.flush();
         }
 
-        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation});
+        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[] {toSwap.size(), queue, swapLocation});
 
         return toSwap.size();
     }
@@ -231,26 +415,27 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
-                    + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+                + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
         }
 
         final String connectionId = in.readUTF();
         if (!connectionId.equals(queue.getIdentifier())) {
-            throw new IllegalArgumentException("Cannot restore Swap File because the file indicates that records belong to Connection with ID " + connectionId + " but received Connection " + queue);
+            throw new IllegalArgumentException("Cannot restore contents from FlowFile Swap File " + swapLocation +
+                " because the file indicates that records belong to Connection with ID " + connectionId + " but attempted to swap those records into " + queue);
         }
 
         final int numRecords = in.readInt();
-        in.readLong();  // Content Size
+        in.readLong(); // Content Size
 
-        return deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, false, claimManager);
+        return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue,
-            final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
+        final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
             // legacy encoding had an "action" because it used to be couple with FlowFile Repository code
@@ -395,109 +580,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    private class QueueIdentifier implements Runnable {
-
-        private final QueueProvider connectionProvider;
-
-        public QueueIdentifier(final QueueProvider connectionProvider) {
-            this.connectionProvider = connectionProvider;
-        }
-
-        @Override
-        public void run() {
-            final Collection<FlowFileQueue> allQueues = connectionProvider.getAllQueues();
-            final BlockingQueue<FlowFileQueue> connectionQueue = new LinkedBlockingQueue<>(allQueues);
-
-            final ThreadFactory threadFactory = new ThreadFactory() {
-                @Override
-                public Thread newThread(final Runnable r) {
-                    final Thread t = new Thread(r);
-                    t.setName("Swap Out FlowFiles");
-                    return t;
-                }
-            };
-
-            final ExecutorService workerExecutor = Executors.newFixedThreadPool(swapOutThreadCount, threadFactory);
-            for (int i = 0; i < swapOutThreadCount; i++) {
-                workerExecutor.submit(new SwapOutTask(connectionQueue));
-            }
-
-            workerExecutor.shutdown();
-
-            try {
-                workerExecutor.awaitTermination(10, TimeUnit.MINUTES);
-            } catch (final InterruptedException e) {
-                // oh well...
-            }
-        }
-    }
-
-    private class SwapInTask implements Runnable {
-
-        @Override
-        public void run() {
-            for (final Map.Entry<FlowFileQueue, QueueLockWrapper> entry : swapMap.entrySet()) {
-                final FlowFileQueue flowFileQueue = entry.getKey();
-
-                // if queue is more than 60% of its swap threshold, don't swap flowfiles in
-                if (flowFileQueue.unswappedSize() >= flowFileQueue.getSwapThreshold() * 0.6F) {
-                    continue;
-                }
-
-                final QueueLockWrapper queueLockWrapper = entry.getValue();
-                if (queueLockWrapper.getLock().tryLock()) {
-                    try {
-                        final Queue<File> queue = queueLockWrapper.getQueue();
-
-                        // Swap FlowFiles in until we hit 90% of the threshold, or until we're out of files.
-                        while (flowFileQueue.unswappedSize() < flowFileQueue.getSwapThreshold() * 0.9F) {
-                            File swapFile = null;
-                            try {
-                                swapFile = queue.poll();
-                                if (swapFile == null) {
-                                    break;
-                                }
-
-                                try (final InputStream fis = new FileInputStream(swapFile);
-                                        final DataInputStream in = new DataInputStream(fis)) {
-                                    final List<FlowFileRecord> swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, claimManager);
-                                    flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue);
-                                    flowFileQueue.putSwappedRecords(swappedFlowFiles);
-                                }
-
-                                if (!swapFile.delete()) {
-                                    warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
-                                }
-                            } catch (final EOFException eof) {
-                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
-
-                                if (!swapFile.delete()) {
-                                    warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
-                                }
-                            } catch (final FileNotFoundException fnfe) {
-                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
-                            } catch (final Exception e) {
-                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
-
-                                if (swapFile != null) {
-                                    queue.add(swapFile);
-                                }
-                            }
-                        }
-                    } finally {
-                        queueLockWrapper.getLock().unlock();
-                    }
-                }
-            }
-        }
-    }
-
-    private void error(final String error, final Throwable t) {
-        error(error);
-        if (logger.isDebugEnabled()) {
-            logger.error("", t);
-        }
-    }
 
     private void error(final String error) {
         logger.error(error);
@@ -513,199 +595,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    private class SwapOutTask implements Runnable {
-
-        private final BlockingQueue<FlowFileQueue> connectionQueue;
-
-        public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
-            this.connectionQueue = connectionQueue;
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                final FlowFileQueue flowFileQueue = connectionQueue.poll();
-                if (flowFileQueue == null) {
-                    logger.debug("No more FlowFile Queues to Swap Out");
-                    return;
-                }
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug("{} has {} FlowFiles to swap out", flowFileQueue, flowFileQueue.getSwapQueueSize());
-                }
-
-                while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
-                    final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
-                    final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
-                    final String swapLocation = swapFile.getAbsolutePath();
-                    final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
-
-                    int recordsSwapped;
-                    try {
-                        try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
-                            recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
-                            fos.getFD().sync();
-                        }
-
-                        if (swapTempFile.renameTo(swapFile)) {
-                            flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
-                        } else {
-                            error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
-                            recordsSwapped = 0;
-                        }
-                    } catch (final IOException ioe) {
-                        recordsSwapped = 0;
-                        flowFileQueue.putSwappedRecords(toSwap);
-                        error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe);
-                    }
-
-                    if (recordsSwapped > 0) {
-                        QueueLockWrapper swapQueue = swapMap.get(flowFileQueue);
-                        if (swapQueue == null) {
-                            swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
-                            final QueueLockWrapper oldQueue = swapMap.putIfAbsent(flowFileQueue, swapQueue);
-                            if (oldQueue != null) {
-                                swapQueue = oldQueue;
-                            }
-                        }
-
-                        swapQueue.getQueue().add(swapFile);
-                    } else {
-                        swapTempFile.delete();
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Recovers FlowFiles from all Swap Files, returning the largest FlowFile ID that was recovered.
-     *
-     * @param queueProvider provider
-     * @return the largest FlowFile ID that was recovered
-     */
-    @Override
-    public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ResourceClaimManager claimManager) {
-        final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
-            @Override
-            public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
-            }
-        });
-
-        if (swapFiles == null) {
-            return 0L;
-        }
-
-        final Collection<FlowFileQueue> allQueues = queueProvider.getAllQueues();
-        final Map<String, FlowFileQueue> queueMap = new HashMap<>();
-        for (final FlowFileQueue queue : allQueues) {
-            queueMap.put(queue.getIdentifier(), queue);
-        }
-
-        final ConnectionSwapInfo swapInfo = new ConnectionSwapInfo();
-        int swappedCount = 0;
-        long swappedBytes = 0L;
-        long maxRecoveredId = 0L;
-
-        for (final File swapFile : swapFiles) {
-            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
-                if (swapFile.delete()) {
-                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
-                } else {
-                    warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
-                }
-
-                continue;
-            }
-
-            // read record to disk via the swap file
-            try (final InputStream fis = new FileInputStream(swapFile);
-                    final InputStream bufferedIn = new BufferedInputStream(fis);
-                    final DataInputStream in = new DataInputStream(bufferedIn)) {
-
-                final int swapEncodingVersion = in.readInt();
-                if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                    final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
-                            + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
-
-                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
-                    throw new IOException(errMsg);
-                }
-
-                final String connectionId = in.readUTF();
-                final FlowFileQueue queue = queueMap.get(connectionId);
-                if (queue == null) {
-                    error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID "
-                            + connectionId + " and that Connection does not exist");
-                    continue;
-                }
-
-                final int numRecords = in.readInt();
-                final long contentSize = in.readLong();
-
-                swapInfo.addSwapSizeInfo(connectionId, swapFile.getAbsolutePath(), new QueueSize(numRecords, contentSize));
-                swappedCount += numRecords;
-                swappedBytes += contentSize;
-
-                final List<FlowFileRecord> records = deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, true, claimManager);
-                long maxId = 0L;
-                for (final FlowFileRecord record : records) {
-                    if (record.getId() > maxId) {
-                        maxId = record.getId();
-                    }
-                }
-
-                if (maxId > maxRecoveredId) {
-                    maxRecoveredId = maxId;
-                }
-            } catch (final IOException ioe) {
-                error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe);
-            }
-        }
-
-        restoreSwapLocations(queueMap.values(), swapInfo);
-        logger.info("Recovered {} FlowFiles ({} bytes) from Swap Files", swappedCount, swappedBytes);
-        return maxRecoveredId;
-    }
-
-    public void restoreSwapLocations(final Collection<FlowFileQueue> flowFileQueues, final ConnectionSwapInfo swapInfo) {
-        for (final FlowFileQueue queue : flowFileQueues) {
-            final String queueId = queue.getIdentifier();
-            final Collection<String> swapFileLocations = swapInfo.getSwapFileLocations(queueId);
-            if (swapFileLocations == null || swapFileLocations.isEmpty()) {
-                continue;
-            }
-
-            final SortedMap<String, QueueSize> sortedFileQueueMap = new TreeMap<>(new SwapFileComparator());
-            for (final String swapFileLocation : swapFileLocations) {
-                final QueueSize queueSize = swapInfo.getSwappedSize(queueId, swapFileLocation);
-                sortedFileQueueMap.put(swapFileLocation, queueSize);
-            }
-
-            QueueLockWrapper fileQueue = swapMap.get(queue);
-            if (fileQueue == null) {
-                fileQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
-                swapMap.put(queue, fileQueue);
-            }
-
-            for (final Map.Entry<String, QueueSize> innerEntry : sortedFileQueueMap.entrySet()) {
-                final File swapFile = new File(innerEntry.getKey());
-                final QueueSize size = innerEntry.getValue();
-                fileQueue.getQueue().add(swapFile);
-                queue.incrementSwapCount(size.getObjectCount(), size.getByteCount());
-            }
-        }
-    }
 
-    @Override
-    public void shutdown() {
-        swapQueueIdentifierExecutor.shutdownNow();
-        swapInExecutor.shutdownNow();
-    }
 
     private static class SwapFileComparator implements Comparator<String> {
-
         @Override
         public int compare(final String o1, final String o2) {
             if (o1 == o2) {
@@ -755,34 +647,4 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    private static class QueueLockWrapper {
-
-        private final Lock lock = new ReentrantLock();
-        private final Queue<File> queue;
-
-        public QueueLockWrapper(final Queue<File> queue) {
-            this.queue = queue;
-        }
-
-        public Queue<File> getQueue() {
-            return queue;
-        }
-
-        public Lock getLock() {
-            return lock;
-        }
-
-        @Override
-        public int hashCode() {
-            return queue.hashCode();
-        }
-
-        @Override
-        public boolean equals(final Object obj) {
-            if (obj instanceof QueueLockWrapper) {
-                return queue.equals(((QueueLockWrapper) obj).queue);
-            }
-            return false;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d9c3f39..23746ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -80,6 +80,8 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
@@ -97,6 +99,7 @@ import org.apache.nifi.controller.repository.RepositoryStatusReport;
 import org.apache.nifi.controller.repository.StandardCounterRepository;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -152,7 +155,6 @@ import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
@@ -216,7 +218,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
     public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 5-minute captures
+    public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
@@ -245,7 +247,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final UserService userService;
     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
     private final ComponentStatusRepository componentStatusRepository;
-    private final long systemStartTime = System.currentTimeMillis();    // time at which the node was started
+    private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
 
     // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
@@ -336,38 +338,36 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
-    private FlowFileSwapManager flowFileSwapManager;    // guarded by read/write lock
-
     private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
     private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
 
     public static FlowController createStandaloneInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor) {
         return new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ false,
-                /* NodeProtocolSender */ null);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ false,
+            /* NodeProtocolSender */ null);
     }
 
     public static FlowController createClusteredInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final NodeProtocolSender protocolSender) {
         final FlowController flowController = new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ true,
-                /* NodeProtocolSender */ protocolSender);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ true,
+            /* NodeProtocolSender */ protocolSender);
 
         flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
 
@@ -375,12 +375,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     private FlowController(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final boolean configuredForClustering,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final boolean configuredForClustering,
+        final NodeProtocolSender protocolSender) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(5);
@@ -416,7 +416,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
+            eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@@ -468,7 +468,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             externalSiteListener = null;
         } else if (isSiteToSiteSecure && sslContext == null) {
             LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
-                    + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
+                + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
             externalSiteListener = null;
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
@@ -501,7 +501,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -543,14 +543,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public void initializeFlow() throws IOException {
         writeLock.lock();
         try {
-            flowFileSwapManager = createSwapManager(properties);
+            // get all connections/queues and recover from swap files.
+            final List<Connection> connections = getGroup(getRootGroupId()).findAllConnections();
 
             long maxIdFromSwapFiles = -1L;
-            if (flowFileSwapManager != null) {
-                if (flowFileRepository.isVolatile()) {
-                    flowFileSwapManager.purge();
-                } else {
-                    maxIdFromSwapFiles = flowFileSwapManager.recoverSwappedFlowFiles(this, contentClaimManager);
+            if (flowFileRepository.isVolatile()) {
+                for (final Connection connection : connections) {
+                    final FlowFileQueue queue = connection.getFlowFileQueue();
+                    queue.purgeSwapFiles();
+                }
+            } else {
+                for (final Connection connection : connections) {
+                    final FlowFileQueue queue = connection.getFlowFileQueue();
+                    final Long maxFlowFileId = queue.recoverSwappedFlowFiles();
+                    if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
+                        maxIdFromSwapFiles = maxFlowFileId;
+                    }
                 }
             }
 
@@ -560,10 +568,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             // ContentRepository to purge superfluous files
             contentRepository.cleanup();
 
-            if (flowFileSwapManager != null) {
-                flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
-            }
-
             if (externalSiteListener != null) {
                 externalSiteListener.start();
             }
@@ -612,7 +616,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
+                        LOG.error("Unable to start {} due to {}", new Object[] {connectable, t.toString()});
                         if (LOG.isDebugEnabled()) {
                             LOG.error("", t);
                         }
@@ -627,7 +631,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
                         startedTransmitting++;
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
+                        LOG.error("Unable to start transmitting with {} due to {}", new Object[] {remoteGroupPort, t});
                     }
                 }
 
@@ -642,7 +646,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
+                        LOG.error("Unable to start {} due to {}", new Object[] {connectable, t});
                     }
                 }
 
@@ -658,7 +662,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -676,7 +680,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+                + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
         }
 
         try {
@@ -690,7 +694,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -721,7 +725,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             relationships.add(new Relationship.Builder().name(relationshipName).build());
         }
 
-        return builder.id(requireNonNull(id).intern()).name(name == null ? null : name.intern()).relationships(relationships).source(requireNonNull(source)).destination(destination).build();
+        // Create and initialize a FlowFileSwapManager for this connection
+        final FlowFileSwapManager swapManager = createSwapManager(properties);
+        final EventReporter eventReporter = createEventReporter(getBulletinRepository());
+        try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+            final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
+                @Override
+                public ResourceClaimManager getResourceClaimManager() {
+                    return getResourceClaimManager();
+                }
+
+                @Override
+                public FlowFileRepository getFlowFileRepository() {
+                    return flowFileRepository;
+                }
+
+                @Override
+                public EventReporter getEventReporter() {
+                    return eventReporter;
+                }
+            };
+
+            swapManager.initialize(initializationContext);
+        }
+
+        return builder.id(requireNonNull(id).intern())
+            .name(name == null ? null : name.intern())
+            .relationships(relationships)
+            .source(requireNonNull(source))
+            .destination(destination)
+            .swapManager(swapManager)
+            .eventReporter(eventReporter)
+            .build();
     }
 
     /**
@@ -910,7 +945,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -927,7 +962,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -1083,24 +1118,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             try {
                 flowFileRepository.close();
             } catch (final Throwable t) {
-                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
+                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] {t});
             }
 
             if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
                 LOG.info("Controller has been terminated successfully.");
             } else {
                 LOG.warn("Controller hasn't terminated properly.  There exists an uninterruptable thread that "
-                        + "will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
+                    + "will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
             }
 
             if (externalSiteListener != null) {
                 externalSiteListener.stop();
             }
 
-            if (flowFileSwapManager != null) {
-                flowFileSwapManager.shutdown();
-            }
-
             if (processScheduler != null) {
                 processScheduler.shutdown();
             }
@@ -1153,7 +1184,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used
      */
     public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+        throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
@@ -1199,7 +1230,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @param maxThreadCount
      *
-     * This method must be called while holding the write lock!
+     *            This method must be called while holding the write lock!
      */
     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
@@ -1267,7 +1298,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws ProcessorInstantiationException
      *
      * @throws IllegalStateException if no process group can be found with the ID of DTO or with the ID of the DTO's parentGroupId, if the template ID specified is invalid, or if the DTO's Parent
-     * Group ID changes but the parent group has incoming or outgoing connections
+     *             Group ID changes but the parent group has incoming or outgoing connections
      *
      * @throws NullPointerException if the DTO or its ID is null
      */
@@ -1371,7 +1402,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @throws NullPointerException if either argument is null
      * @throws IllegalStateException if the snippet is not valid because a component in the snippet has an ID that is not unique to this flow, or because it shares an Input Port or Output Port at the
-     * root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
+     *             root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
      * @throws ProcessorInstantiationException if unable to instantiate a processor
      */
     public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
@@ -2542,7 +2573,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         if (firstTimeAdded) {
             final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
             final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
 
             try {
                 task.initialize(config);
@@ -2888,7 +2919,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         readLock.lock();
         try {
             return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
-                    && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
+                && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -2948,7 +2979,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     /**
      * @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN
-     * is not available - for instance, if cluster communications are not secure
+     *         is not available - for instance, if cluster communications are not secure
      */
     public String getClusterManagerDN() {
         readLock.lock();
@@ -3101,10 +3132,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isContentSame() {
                 return areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer())
-                        && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
-                        && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
-                        && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
-                        && areEqual(event.getPreviousFileSize(), event.getFileSize());
+                    && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
+                    && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
+                    && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
+                    && areEqual(event.getPreviousFileSize(), event.getFileSize());
             }
 
             @Override
@@ -3297,7 +3328,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
+            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
         contentClaimManager.incrementClaimantCount(resourceClaim);
@@ -3367,7 +3398,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
         final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord);
         record.setDestination(queue);
-        flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record));
+        flowFileRepository.updateRepository(Collections.<RepositoryRecord> singleton(record));
 
         // Enqueue the data
         queue.put(flowFileRecord);
@@ -3434,11 +3465,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 protocolSender.sendBulletins(message);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            String.format(
-                                    "Sending bulletins to cluster manager at %s",
-                                    dateFormatter.format(new Date())
-                            )
-                    );
+                        String.format(
+                            "Sending bulletins to cluster manager at %s",
+                            dateFormatter.format(new Date())));
                 }
 
             } catch (final UnknownServiceAddressException usae) {
@@ -3496,7 +3525,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     } else {
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
-                                bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
+                            bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {
                     escapedBulletin = bulletin;
@@ -3554,9 +3583,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
                 heartbeatLogger.info("Heartbeat created at {} and sent at {}; send took {} millis",
-                        dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
-                        dateFormatter.format(new Date()),
-                        sendMillis);
+                    dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
+                    dateFormatter.format(new Date()),
+                    sendMillis);
             } catch (final UnknownServiceAddressException usae) {
                 if (heartbeatLogger.isDebugEnabled()) {
                     heartbeatLogger.debug(usae.getMessage());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index d5dba82..f70f602 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -25,11 +25,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
deleted file mode 100644
index 642e8ff..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ConnectionSwapInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.processor.QueueSize;
-
-public class ConnectionSwapInfo {
-
-    private final Map<String, Map<String, QueueSize>> connectionMap = new HashMap<>();
-
-    public void addSwapSizeInfo(final String connectionId, final String swapFileLocation, final QueueSize queueSize) {
-        Map<String, QueueSize> queueSizeMap = connectionMap.get(connectionId);
-        if (queueSizeMap == null) {
-            queueSizeMap = new HashMap<>();
-            connectionMap.put(connectionId, queueSizeMap);
-        }
-
-        queueSizeMap.put(swapFileLocation, queueSize);
-    }
-
-    public Collection<String> getSwapFileLocations(final String connectionId) {
-        final Map<String, QueueSize> sizeMap = connectionMap.get(connectionId);
-        if (sizeMap == null) {
-            return Collections.<String>emptyList();
-        }
-
-        return Collections.unmodifiableCollection(sizeMap.keySet());
-    }
-
-    public QueueSize getSwappedSize(final String connectionId, final String swapFileLocation) {
-        final Map<String, QueueSize> sizeMap = connectionMap.get(connectionId);
-        if (sizeMap == null) {
-            return null;
-        }
-
-        return sizeMap.get(swapFileLocation);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3ba7e4e..a32a485 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -41,8 +41,9 @@ import java.util.regex.Pattern;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
@@ -57,7 +58,6 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index 5fcb35a..c5be81e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -20,7 +20,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.processor.Relationship;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index a85b23b..ae8824a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -21,7 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 5ee5fb5..639a4c8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -41,7 +41,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index b573006..6eeddc5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -28,10 +28,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -42,12 +42,12 @@ public class TestFileSystemSwapManager {
         System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
 
         try (final InputStream fis = new FileInputStream(new File("src/test/resources/old-swap-file.swap"));
-                final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
+            final DataInputStream in = new DataInputStream(new BufferedInputStream(fis))) {
 
             final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
             Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, new NopResourceClaimManager());
+            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, "/src/test/resources/old-swap-file.swap", new NopResourceClaimManager());
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 0e11923..12f8e5e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -48,9 +48,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
@@ -133,7 +133,8 @@ public class TestStandardProcessSession {
         final Connection connection = Mockito.mock(Connection.class);
         final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
 
-        flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, 10000);
+        final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
+        flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, swapManager, null, 10000);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {
@@ -445,7 +446,7 @@ public class TestStandardProcessSession {
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
 
-        assertEquals(1, provenanceRepo.getEvents(0L, 100000).size());  // 1 event for both parents and children
+        assertEquals(1, provenanceRepo.getEvents(0L, 100000).size()); // 1 event for both parents and children
     }
 
     @Test
@@ -809,7 +810,7 @@ public class TestStandardProcessSession {
             .entryDate(System.currentTimeMillis())
             .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
 
-            .contentClaimOffset(1000L).size(1L).build();
+        .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
 
         // attempt to read the data.

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 2138928..e836b44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
@@ -33,7 +34,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.util.file.FileUtils;
 


[4/5] nifi git commit: NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather than in a background thread

Posted by ma...@apache.org.
NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather than in a background thread


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49a781df
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49a781df
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49a781df

Branch: refs/heads/NIFI-730
Commit: 49a781df2d44859ec59672c2755b7346452cd74a
Parents: b8c51dc
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 12 13:27:07 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 10:03:03 2015 -0400

----------------------------------------------------------------------
 .../controller/queue/DropFlowFileState.java     |   3 +-
 .../controller/queue/DropFlowFileStatus.java    |   7 +-
 .../nifi/controller/queue/FlowFileQueue.java    |  50 --
 .../apache/nifi/controller/queue/QueueSize.java |   7 +
 .../repository/FlowFileSwapManager.java         |  17 +-
 .../SwapManagerInitializationContext.java       |   1 -
 .../java/org/apache/nifi/util/MockFlowFile.java |  24 +-
 .../org/apache/nifi/util/MockFlowFileQueue.java |   3 +-
 .../apache/nifi/util/MockProcessSession.java    |  15 +-
 .../nifi/util/StandardProcessorTestRunner.java  |  24 +-
 .../java/org/apache/nifi/util/TestRunner.java   | 124 ++--
 .../nifi/controller/DropFlowFileRequest.java    |  99 +++
 .../nifi/controller/StandardFlowFileQueue.java  | 687 +++++++++----------
 .../controller/TestStandardFlowFileQueue.java   | 330 +++++++++
 .../nifi/connectable/StandardConnection.java    |  33 +-
 .../nifi/controller/FileSystemSwapManager.java  |  54 +-
 .../apache/nifi/controller/FlowController.java  |  24 +-
 .../repository/StandardProcessSession.java      | 102 +--
 .../java/org/apache/nifi/util/Connectables.java |   2 +-
 .../controller/TestFileSystemSwapManager.java   | 142 +++-
 .../repository/TestStandardProcessSession.java  |   2 +-
 .../nifi/web/controller/ControllerFacade.java   |  30 +-
 22 files changed, 1155 insertions(+), 625 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
index 3f16d00..e412b80 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -25,7 +25,8 @@ public enum DropFlowFileState {
     WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"),
     DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"),
     COMPLETE("Completed Successfully"),
-    FAILURE("Failed");
+    FAILURE("Failed"),
+    CANCELED("Cancelled by User");
     
     private final String description;
     

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
index b216608..3c3be9b 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -43,6 +43,12 @@ public interface DropFlowFileStatus {
     long getRequestSubmissionTime();
 
     /**
+     * @return the date/time (in milliseconds since epoch) at which the status of the
+     *         request was last updated
+     */
+    long getLastUpdated();
+
+    /**
      * @return the size of the queue when the drop request was issued or <code>null</code> if
      *         it is not yet known, which can happen if the {@link DropFlowFileState} is
      *         {@link DropFlowFileState#WAITING_FOR_LOCK}.
@@ -58,5 +64,4 @@ public interface DropFlowFileStatus {
      * @return the current state of the operation
      */
     DropFlowFileState getState();
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index 31f17e0..bc2f358 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -60,13 +60,6 @@ public interface FlowFileQueue {
     void purgeSwapFiles();
 
     /**
-     * @return the minimum number of FlowFiles that must be present in order for
-     *         FlowFiles to begin being swapped out of the queue
-     */
-    // TODO: REMOVE THIS.
-    int getSwapThreshold();
-
-    /**
      * Resets the comparator used by this queue to maintain order.
      *
      * @param newPriorities the ordered list of prioritizers to use to determine
@@ -112,12 +105,8 @@ public interface FlowFileQueue {
      *         not include those FlowFiles that have been swapped out or are currently
      *         being processed
      */
-    // TODO: REMOVE?
     boolean isActiveQueueEmpty();
 
-    // TODO: REMOVE?
-    QueueSize getActiveQueueSize();
-
     /**
      * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
      * is considered to be unacknowledged if it has been pulled from the queue by some component
@@ -152,45 +141,6 @@ public interface FlowFileQueue {
     void putAll(Collection<FlowFileRecord> files);
 
     /**
-     * Removes all records from the internal swap queue and returns them.
-     *
-     * @return all removed records from internal swap queue
-     */
-    // TODO: REMOVE THIS?
-    List<FlowFileRecord> pollSwappableRecords();
-
-    /**
-     * Restores the records from swap space into this queue, adding the records
-     * that have expired to the given set instead of enqueuing them.
-     *
-     * @param records that were swapped in
-     */
-    // TODO: REMOVE THIS?
-    void putSwappedRecords(Collection<FlowFileRecord> records);
-
-    /**
-     * Updates the internal counters of how much data is queued, based on
-     * swapped data that is being restored.
-     *
-     * @param numRecords count of records swapped in
-     * @param contentSize total size of records being swapped in
-     */
-    // TODO: REMOVE THIS?
-    void incrementSwapCount(int numRecords, long contentSize);
-
-    /**
-     * @return the number of FlowFiles that are enqueued and not swapped
-     */
-    // TODO: REMOVE THIS?
-    int unswappedSize();
-
-    // TODO: REMOVE THIS?
-    int getSwapRecordCount();
-
-    // TODO: REMOVE THIS?
-    int getSwapQueueSize();
-
-    /**
      * @param expiredRecords expired records
      * @return the next flow file on the queue; null if empty
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
index 42d8416..528d652 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.queue;
 
+import java.text.NumberFormat;
+
 /**
  *
  */
@@ -45,4 +47,9 @@ public class QueueSize {
     public long getByteCount() {
         return totalSizeBytes;
     }
+
+    @Override
+    public String toString() {
+        return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 57e9186..a70d287 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -26,6 +26,9 @@ import org.apache.nifi.controller.queue.QueueSize;
  * Defines a mechanism by which FlowFiles can be move into external storage or
  * memory so that they can be removed from the Java heap and vice-versa
  */
+// TODO: This needs to be refactored into two different mechanisms, one that is responsible for doing
+// framework-y types of things, such as updating the repositories, and another that is responsible
+// for serializing and deserializing FlowFiles to external storage.
 public interface FlowFileSwapManager {
 
     /**
@@ -38,6 +41,16 @@ public interface FlowFileSwapManager {
     void initialize(SwapManagerInitializationContext initializationContext);
 
     /**
+     * Drops all FlowFiles that are swapped out at the given location. This will update the Provenance
+     * Repository as well as the FlowFile Repository and
+     *
+     * @param swapLocation the location of the swap file to drop
+     * @param flowFileQueue the queue to which the FlowFiles belong
+     * @param user the user that initiated the request
+     */
+    void dropSwappedFlowFiles(String swapLocation, FlowFileQueue flowFileQueue, String user) throws IOException;
+
+    /**
      * Swaps out the given FlowFiles that belong to the queue with the given identifier.
      *
      * @param flowFiles the FlowFiles to swap out to external storage
@@ -53,13 +66,13 @@ public interface FlowFileSwapManager {
      * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
      * at the given location remains in that location and the FlowFile Repository is not updated.
      *
-     * @param swapLocation the location of hte swap file
+     * @param swapLocation the location of the swap file
      * @param flowFileQueue the queue that the FlowFiles belong to
      * @return the FlowFiles that live at the given swap location
      *
      * @throws IOException if unable to recover the FlowFiles from the given location
      */
-    List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException;
+    List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException;
 
     /**
      * Recovers the FlowFiles from the swap file that lives at the given location and belongs

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
index 564d5ec..0e30784 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
@@ -27,7 +27,6 @@ public interface SwapManagerInitializationContext {
      */
     FlowFileRepository getFlowFileRepository();
 
-
     /**
      * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when
      *         performing swapping actions

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index e9fb9d6..41bcc74 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -33,12 +33,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
 import org.junit.Assert;
 
-public class MockFlowFile implements FlowFile {
+public class MockFlowFile implements FlowFileRecord {
 
     private final Map<String, String> attributes = new HashMap<>();
 
@@ -170,7 +171,7 @@ public class MockFlowFile implements FlowFile {
 
     public void assertAttributeNotExists(final String attributeName) {
         Assert.assertFalse("Attribute " + attributeName + " not exists with value " + attributes.get(attributeName),
-                attributes.containsKey(attributeName));
+            attributes.containsKey(attributeName));
     }
 
     public void assertAttributeEquals(final String attributeName, final String expectedValue) {
@@ -250,7 +251,7 @@ public class MockFlowFile implements FlowFile {
 
                 if ((fromStream & 0xFF) != (data[i] & 0xFF)) {
                     Assert.fail("FlowFile content differs from input at byte " + bytesRead + " with input having value "
-                            + (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF));
+                        + (fromStream & 0xFF) + " and FlowFile having value " + (data[i] & 0xFF));
                 }
 
                 bytesRead++;
@@ -274,4 +275,19 @@ public class MockFlowFile implements FlowFile {
     public Long getLastQueueDate() {
         return entryDate;
     }
+
+    @Override
+    public long getPenaltyExpirationMillis() {
+        return -1;
+    }
+
+    @Override
+    public ContentClaim getContentClaim() {
+        return null;
+    }
+
+    @Override
+    public long getContentClaimOffset() {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
index 775a1d5..0c6ec2a 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFileQueue.java
@@ -23,7 +23,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.controller.queue.QueueSize;
+
 
 public class MockFlowFileQueue {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 1060854..85fc784 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -40,12 +40,12 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;
@@ -691,7 +691,7 @@ public class MockProcessSession implements ProcessSession {
     /**
      * @param relationship to get flowfiles for
      * @return a List of FlowFiles in the order in which they were transferred
-     * to the given relationship
+     *         to the given relationship
      */
     public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) {
         final Relationship procRel = new Relationship.Builder().name(relationship).build();
@@ -778,7 +778,7 @@ public class MockProcessSession implements ProcessSession {
      */
     private FlowFile inheritAttributes(final FlowFile source, final FlowFile destination) {
         if (source == null || destination == null || source == destination) {
-            return destination; //don't need to inherit from ourselves
+            return destination; // don't need to inherit from ourselves
         }
         final FlowFile updated = putAllAttributes(destination, source.getAttributes());
         getProvenanceReporter().fork(source, Collections.singletonList(updated));
@@ -801,7 +801,7 @@ public class MockProcessSession implements ProcessSession {
         int uuidsCaptured = 0;
         for (final FlowFile source : sources) {
             if (source == destination) {
-                continue; //don't want to capture parent uuid of this.  Something can't be a child of itself
+                continue; // don't want to capture parent uuid of this. Something can't be a child of itself
             }
             final String sourceUuid = source.getAttribute(CoreAttributes.UUID.key());
             if (sourceUuid != null && !sourceUuid.trim().isEmpty()) {
@@ -832,7 +832,7 @@ public class MockProcessSession implements ProcessSession {
      */
     private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
         final Map<String, String> result = new HashMap<>();
-        //trivial cases
+        // trivial cases
         if (flowFileList == null || flowFileList.isEmpty()) {
             return result;
         } else if (flowFileList.size() == 1) {
@@ -845,8 +845,7 @@ public class MockProcessSession implements ProcessSession {
          */
         final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
 
-        outer:
-        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+        outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
             final String key = mapEntry.getKey();
             final String value = mapEntry.getValue();
             for (final FlowFile flowFile : flowFileList) {
@@ -900,7 +899,7 @@ public class MockProcessSession implements ProcessSession {
     public void assertTransferCount(final Relationship relationship, final int count) {
         final int transferCount = getFlowFilesForRelationship(relationship).size();
         Assert.assertEquals("Expected " + count + " FlowFiles to be transferred to "
-                + relationship + " but actual transfer count was " + transferCount, count, transferCount);
+            + relationship + " but actual transfer count was " + transferCount, count, transferCount);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index eeeff61..0d00cc8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -58,12 +58,12 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
@@ -222,7 +222,7 @@ public class StandardProcessorTestRunner implements TestRunner {
             boolean unscheduledRun = false;
             for (final Future<Throwable> future : futures) {
                 try {
-                    final Throwable thrown = future.get();   // wait for the result
+                    final Throwable thrown = future.get(); // wait for the result
                     if (thrown != null) {
                         throw new AssertionError(thrown);
                     }
@@ -551,11 +551,11 @@ public class StandardProcessorTestRunner implements TestRunner {
     @Override
     public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException {
         // hold off on failing due to deprecated annotation for now... will introduce later.
-//        for ( final Method method : service.getClass().getMethods() ) {
-//            if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) {
-//                Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method);
-//            }
-//        }
+        // for ( final Method method : service.getClass().getMethods() ) {
+        // if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) {
+        // Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method);
+        // }
+        // }
 
         final ComponentLog logger = new MockProcessorLog(identifier, service);
         final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger);
@@ -716,11 +716,11 @@ public class StandardProcessorTestRunner implements TestRunner {
         final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName);
         if (descriptor == null) {
             return new ValidationResult.Builder()
-                    .input(propertyName)
-                    .explanation(propertyName + " is not a known Property for Controller Service " + service)
-                    .subject("Invalid property")
-                    .valid(false)
-                    .build();
+                .input(propertyName)
+                .explanation(propertyName + " is not a known Property for Controller Service " + service)
+                .subject("Invalid property")
+                .valid(false)
+                .build();
         }
         return setProperty(service, descriptor, value);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 6e66bfe..ec901fe 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -26,11 +26,11 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
@@ -40,22 +40,22 @@ public interface TestRunner {
 
     /**
      * @return the {@link Processor} for which this <code>TestRunner</code> is
-     * configured
+     *         configured
      */
     Processor getProcessor();
 
     /**
      * @return the {@link ProcessSessionFactory} that this
-     * <code>TestRunner</code> will use to invoke the
-     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method
+     *         <code>TestRunner</code> will use to invoke the
+     *         {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method
      */
     ProcessSessionFactory getProcessSessionFactory();
 
     /**
      * @return the {@Link ProcessContext} that this <code>TestRunner</code> will
-     * use to invoke the
-     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}
-     * method
+     *         use to invoke the
+     *         {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}
+     *         method
      */
     ProcessContext getProcessContext();
 
@@ -120,7 +120,7 @@ public interface TestRunner {
      *
      * @param iterations number of iterations
      * @param stopOnFinish whether or not to run the Processor methods that are
-     * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped}
+     *            annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped}
      * @param initialize true if must initialize
      */
     void run(int iterations, boolean stopOnFinish, final boolean initialize);
@@ -163,10 +163,10 @@ public interface TestRunner {
      *
      * @param iterations number of iterations
      * @param stopOnFinish whether or not to run the Processor methods that are
-     * annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped}
+     *            annotated with {@link org.apache.nifi.processor.annotation.OnStopped @OnStopped}
      * @param initialize true if must initialize
      * @param runWait indicates the amount of time in milliseconds that the framework should wait for
-     * processors to stop running before calling the {@link org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
+     *            processors to stop running before calling the {@link org.apache.nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
      */
     void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait);
 
@@ -187,8 +187,8 @@ public interface TestRunner {
 
     /**
      * @return the currently configured number of threads that will be used to
-     * runt he Processor when calling the {@link #run()} or {@link #run(int)}
-     * methods
+     *         runt he Processor when calling the {@link #run()} or {@link #run(int)}
+     *         methods
      */
     int getThreadCount();
 
@@ -296,7 +296,7 @@ public interface TestRunner {
 
     /**
      * @return <code>true</code> if the Input Queue to the Processor is empty,
-     * <code>false</code> otherwise
+     *         <code>false</code> otherwise
      */
     boolean isQueueEmpty();
 
@@ -421,7 +421,7 @@ public interface TestRunner {
 
     /**
      * @return the {@link ProvenanceReporter} that will be used by the
-     * configured {@link Processor} for reporting Provenance Events
+     *         configured {@link Processor} for reporting Provenance Events
      */
     ProvenanceReporter getProvenanceReporter();
 
@@ -433,7 +433,7 @@ public interface TestRunner {
     /**
      * @param name of counter
      * @return the current value of the counter with the specified name, or null
-     * if no counter exists with the specified name
+     *         if no counter exists with the specified name
      */
     Long getCounterValue(String name);
 
@@ -599,14 +599,14 @@ public interface TestRunner {
     /**
      * @param service the service
      * @return {@code true} if the given Controller Service is enabled,
-     * {@code false} if it is disabled
+     *         {@code false} if it is disabled
      *
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      */
     boolean isControllerServiceEnabled(ControllerService service);
 
@@ -622,11 +622,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     void removeControllerService(ControllerService service);
@@ -641,11 +641,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     ValidationResult setProperty(ControllerService service, PropertyDescriptor property, String value);
@@ -660,11 +660,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     ValidationResult setProperty(ControllerService service, PropertyDescriptor property, AllowableValue value);
@@ -679,11 +679,11 @@ public interface TestRunner {
      *
      * @throws IllegalStateException if the ControllerService is not disabled
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     ValidationResult setProperty(ControllerService service, String propertyName, String value);
@@ -698,19 +698,19 @@ public interface TestRunner {
      * @throws IllegalStateException if the Controller Service is not disabled
      *
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      */
     void setAnnotationData(ControllerService service, String annotationData);
 
     /**
      * @param identifier of controller service
      * @return the {@link ControllerService} that is registered with the given
-     * identifier, or <code>null</code> if no Controller Service exists with the
-     * given identifier
+     *         identifier, or <code>null</code> if no Controller Service exists with the
+     *         given identifier
      */
     ControllerService getControllerService(String identifier);
 
@@ -720,11 +720,11 @@ public interface TestRunner {
      *
      * @param service the service to validate
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      */
     void assertValid(ControllerService service);
 
@@ -734,11 +734,11 @@ public interface TestRunner {
      *
      * @param service the service to validate
      * @throws IllegalArgumentException if the given ControllerService is not
-     * known by this TestRunner (i.e., it has not been added via the
-     * {@link #addControllerService(String, ControllerService)} or
-     * {@link #addControllerService(String, ControllerService, Map)} method or
-     * if the Controller Service has been removed via the
-     * {@link #removeControllerService(ControllerService)} method.
+     *             known by this TestRunner (i.e., it has not been added via the
+     *             {@link #addControllerService(String, ControllerService)} or
+     *             {@link #addControllerService(String, ControllerService, Map)} method or
+     *             if the Controller Service has been removed via the
+     *             {@link #removeControllerService(ControllerService)} method.
      *
      */
     void assertNotValid(ControllerService service);
@@ -748,12 +748,12 @@ public interface TestRunner {
      * @param identifier identifier of service
      * @param serviceType type of service
      * @return the {@link ControllerService} that is registered with the given
-     * identifier, cast as the provided service type, or <code>null</code> if no
-     * Controller Service exists with the given identifier
+     *         identifier, cast as the provided service type, or <code>null</code> if no
+     *         Controller Service exists with the given identifier
      *
      * @throws ClassCastException if the identifier given is registered for a
-     * Controller Service but that Controller Service is not of the type
-     * specified
+     *             Controller Service but that Controller Service is not of the type
+     *             specified
      */
     <T extends ControllerService> T getControllerService(String identifier, Class<T> serviceType);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
new file mode 100644
index 0000000..609fe75
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import org.apache.nifi.controller.queue.DropFlowFileState;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.QueueSize;
+
+public class DropFlowFileRequest implements DropFlowFileStatus {
+    private final String identifier;
+    private final long submissionTime = System.currentTimeMillis();
+
+    private volatile QueueSize originalSize;
+    private volatile QueueSize currentSize;
+    private volatile long lastUpdated = System.currentTimeMillis();
+    private volatile Thread executionThread;
+
+    private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
+
+
+    public DropFlowFileRequest(final String identifier) {
+        this.identifier = identifier;
+    }
+
+    @Override
+    public String getRequestIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public long getRequestSubmissionTime() {
+        return submissionTime;
+    }
+
+    @Override
+    public QueueSize getOriginalSize() {
+        return originalSize;
+    }
+
+    void setOriginalSize(final QueueSize originalSize) {
+        this.originalSize = originalSize;
+    }
+
+    @Override
+    public QueueSize getCurrentSize() {
+        return currentSize;
+    }
+
+    void setCurrentSize(final QueueSize queueSize) {
+        this.currentSize = currentSize;
+    }
+
+    @Override
+    public DropFlowFileState getState() {
+        return state;
+    }
+
+    @Override
+    public long getLastUpdated() {
+        return lastUpdated;
+    }
+
+    synchronized void setState(final DropFlowFileState state) {
+        this.state = state;
+        this.lastUpdated = System.currentTimeMillis();
+    }
+
+    void setExecutionThread(final Thread thread) {
+        this.executionThread = thread;
+    }
+
+    synchronized boolean cancel() {
+        if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
+            return false;
+        }
+
+        this.state = DropFlowFileState.CANCELED;
+        if (executionThread != null) {
+            executionThread.interrupt();
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index df356fd..073e5fb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -24,9 +25,13 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,25 +40,32 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileState;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.util.timebuffer.LongEntityAccess;
-import org.apache.nifi.util.timebuffer.TimedBuffer;
-import org.apache.nifi.util.timebuffer.TimestampedLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,15 +79,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000;
     public static final int SWAP_RECORD_POLL_SIZE = 10000;
 
-    // When we have very high contention on a FlowFile Queue, the writeLock quickly becomes the bottleneck. In order to avoid this,
-    // we keep track of how often we are obtaining the write lock. If we exceed some threshold, we start performing a Pre-fetch so that
-    // we can then poll many times without having to obtain the lock.
-    // If lock obtained an average of more than PREFETCH_POLL_THRESHOLD times per second in order to poll from queue for last 5 seconds, do a pre-fetch.
-    public static final int PREFETCH_POLL_THRESHOLD = 1000;
-    public static final int PRIORITIZED_PREFETCH_SIZE = 10;
-    public static final int UNPRIORITIZED_PREFETCH_SIZE = 1000;
-    private volatile int prefetchSize = UNPRIORITIZED_PREFETCH_SIZE; // when we pre-fetch, how many should we pre-fetch?
-
     private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
 
     private PriorityQueue<FlowFileRecord> activeQueue = null;
@@ -97,9 +100,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private final List<FlowFilePrioritizer> priorities;
     private final int swapThreshold;
     private final FlowFileSwapManager swapManager;
+    private final List<String> swapLocations = new ArrayList<>();
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
+    private final FlowFileRepository flowFileRepository;
+    private final ProvenanceEventRepository provRepository;
+    private final ResourceClaimManager resourceClaimManager;
 
     private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
     private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
@@ -108,8 +115,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
-    public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
-        final int swapThreshold) {
+    public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
+        final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         maximumQueueObjectCount = 0L;
@@ -120,6 +127,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         swapQueue = new ArrayList<>();
         this.eventReporter = eventReporter;
         this.swapManager = swapManager;
+        this.flowFileRepository = flowFileRepo;
+        this.provRepository = provRepo;
+        this.resourceClaimManager = resourceClaimManager;
 
         this.identifier = identifier;
         this.swapThreshold = swapThreshold;
@@ -141,11 +151,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public int getSwapThreshold() {
-        return swapThreshold;
-    }
-
-    @Override
     public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
         writeLock.lock();
         try {
@@ -154,12 +159,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             activeQueue = newQueue;
             priorities.clear();
             priorities.addAll(newPriorities);
-
-            if (newPriorities.isEmpty()) {
-                prefetchSize = UNPRIORITIZED_PREFETCH_SIZE;
-            } else {
-                prefetchSize = PRIORITIZED_PREFETCH_SIZE;
-            }
         } finally {
             writeLock.unlock("setPriorities");
         }
@@ -225,33 +224,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
      */
     private QueueSize getQueueSize() {
         final QueueSize unacknowledged = unacknowledgedSizeRef.get();
-        final PreFetch preFetch = preFetchRef.get();
 
-        final int preFetchCount;
-        final long preFetchSize;
-        if (preFetch == null) {
-            preFetchCount = 0;
-            preFetchSize = 0L;
-        } else {
-            final QueueSize preFetchQueueSize = preFetch.size();
-            preFetchCount = preFetchQueueSize.getObjectCount();
-            preFetchSize = preFetchQueueSize.getByteCount();
-        }
-
-        return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount,
-            activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize);
+        return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount(),
+            activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount());
     }
 
     @Override
     public boolean isEmpty() {
         readLock.lock();
         try {
-            final PreFetch prefetch = preFetchRef.get();
-            if (prefetch == null) {
-                return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
-            } else {
-                return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0 && prefetch.size().getObjectCount() == 0;
-            }
+            return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
         } finally {
             readLock.unlock("isEmpty");
         }
@@ -260,30 +242,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     @Override
     public boolean isActiveQueueEmpty() {
         final int activeQueueSize = activeQueueSizeRef.get();
-        if (activeQueueSize == 0) {
-            final PreFetch preFetch = preFetchRef.get();
-            if (preFetch == null) {
-                return true;
-            }
-
-            final QueueSize queueSize = preFetch.size();
-            return queueSize.getObjectCount() == 0;
-        } else {
-            return false;
-        }
+        return activeQueueSize == 0;
     }
 
-    @Override
     public QueueSize getActiveQueueSize() {
         readLock.lock();
         try {
-            final PreFetch preFetch = preFetchRef.get();
-            if (preFetch == null) {
-                return new QueueSize(activeQueue.size(), activeQueueContentSize);
-            } else {
-                final QueueSize preFetchSize = preFetch.size();
-                return new QueueSize(activeQueue.size() + preFetchSize.getObjectCount(), activeQueueContentSize + preFetchSize.getByteCount());
-            }
+            return new QueueSize(activeQueue.size(), activeQueueContentSize);
         } finally {
             readLock.unlock("getActiveQueueSize");
         }
@@ -374,6 +339,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 swappedContentSize += file.getSize();
                 swappedRecordCount++;
                 swapMode = true;
+                writeSwapFilesIfNecessary();
             } else {
                 activeQueueContentSize += file.getSize();
                 activeQueue.add(file);
@@ -405,6 +371,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                 swappedContentSize += bytes;
                 swappedRecordCount += numFiles;
                 swapMode = true;
+                writeSwapFilesIfNecessary();
             } else {
                 activeQueueContentSize += bytes;
                 activeQueue.addAll(files);
@@ -421,116 +388,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         }
     }
 
-    @Override
-    public List<FlowFileRecord> pollSwappableRecords() {
-        writeLock.lock();
-        try {
-            if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
-                return null;
-            }
-
-            final List<FlowFileRecord> swapRecords = new ArrayList<>(Math.min(SWAP_RECORD_POLL_SIZE, swapQueue.size()));
-            final Iterator<FlowFileRecord> itr = swapQueue.iterator();
-            while (itr.hasNext() && swapRecords.size() < SWAP_RECORD_POLL_SIZE) {
-                final FlowFileRecord record = itr.next();
-                swapRecords.add(record);
-                itr.remove();
-            }
-
-            swapQueue.trimToSize();
-            return swapRecords;
-        } finally {
-            writeLock.unlock("pollSwappableRecords");
-        }
-    }
-
-    @Override
-    public void putSwappedRecords(final Collection<FlowFileRecord> records) {
-        writeLock.lock();
-        try {
-            try {
-                for (final FlowFileRecord record : records) {
-                    swappedContentSize -= record.getSize();
-                    swappedRecordCount--;
-                    activeQueueContentSize += record.getSize();
-                    activeQueue.add(record);
-                }
-
-                if (swappedRecordCount > swapQueue.size()) {
-                    // we have more swap files to be swapped in.
-                    return;
-                }
-
-                // If a call to #pollSwappableRecords will not produce any, go ahead and roll those FlowFiles back into the mix
-                if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
-                    for (final FlowFileRecord record : swapQueue) {
-                        activeQueue.add(record);
-                        activeQueueContentSize += record.getSize();
-                    }
-                    swapQueue.clear();
-                    swappedContentSize = 0L;
-                    swappedRecordCount = 0;
-                    swapMode = false;
-                }
-            } finally {
-                activeQueueSizeRef.set(activeQueue.size());
-            }
-        } finally {
-            writeLock.unlock("putSwappedRecords");
-            scheduler.registerEvent(connection.getDestination());
-        }
-    }
-
-    @Override
-    public void incrementSwapCount(final int numRecords, final long contentSize) {
-        writeLock.lock();
-        try {
-            swappedContentSize += contentSize;
-            swappedRecordCount += numRecords;
-        } finally {
-            writeLock.unlock("incrementSwapCount");
-        }
-    }
-
-    @Override
-    public int unswappedSize() {
-        readLock.lock();
-        try {
-            return activeQueue.size() + unacknowledgedSizeRef.get().getObjectCount();
-        } finally {
-            readLock.unlock("unswappedSize");
-        }
-    }
-
-    @Override
-    public int getSwapRecordCount() {
-        readLock.lock();
-        try {
-            return swappedRecordCount;
-        } finally {
-            readLock.unlock("getSwapRecordCount");
-        }
-    }
-
-    @Override
-    public int getSwapQueueSize() {
-        readLock.lock();
-        try {
-            if (logger.isDebugEnabled()) {
-                final long byteToMbDivisor = 1024L * 1024L;
-                final QueueSize unacknowledged = unacknowledgedSizeRef.get();
-
-                logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB",
-                    activeQueue.size(), activeQueueContentSize / byteToMbDivisor,
-                    swappedRecordCount, swappedContentSize / byteToMbDivisor,
-                    unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor);
-            }
-
-            return swapQueue.size();
-        } finally {
-            readLock.unlock("getSwapQueueSize");
-        }
-    }
 
     private boolean isLaterThan(final Long maxAge) {
         if (maxAge == null) {
@@ -558,30 +415,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
         // First check if we have any records Pre-Fetched.
         final long expirationMillis = flowFileExpirationMillis.get();
-        final PreFetch preFetch = preFetchRef.get();
-        if (preFetch != null) {
-            if (preFetch.isExpired()) {
-                requeueExpiredPrefetch(preFetch);
-            } else {
-                while (true) {
-                    final FlowFileRecord next = preFetch.nextRecord();
-                    if (next == null) {
-                        break;
-                    }
-
-                    if (isLaterThan(getExpirationDate(next, expirationMillis))) {
-                        expiredRecords.add(next);
-                        continue;
-                    }
-
-                    updateUnacknowledgedSize(1, next.getSize());
-                    return next;
-                }
-
-                preFetchRef.compareAndSet(preFetch, null);
-            }
-        }
-
         writeLock.lock();
         try {
             flowFile = doPoll(expiredRecords, expirationMillis);
@@ -631,9 +464,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             queueFullRef.set(determineIfFull());
         }
 
-        if (incrementPollCount()) {
-            prefetch();
-        }
         return isExpired ? null : flowFile;
     }
 
@@ -642,38 +472,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         final List<FlowFileRecord> records = new ArrayList<>(Math.min(1024, maxResults));
 
         // First check if we have any records Pre-Fetched.
-        final long expirationMillis = flowFileExpirationMillis.get();
-        final PreFetch preFetch = preFetchRef.get();
-        if (preFetch != null) {
-            if (preFetch.isExpired()) {
-                requeueExpiredPrefetch(preFetch);
-            } else {
-                long totalSize = 0L;
-                for (int i = 0; i < maxResults; i++) {
-                    final FlowFileRecord next = preFetch.nextRecord();
-                    if (next == null) {
-                        break;
-                    }
-
-                    if (isLaterThan(getExpirationDate(next, expirationMillis))) {
-                        expiredRecords.add(next);
-                        continue;
-                    }
-
-                    records.add(next);
-                    totalSize += next.getSize();
-                }
-
-                // If anything was prefetched, use what we have.
-                if (!records.isEmpty()) {
-                    updateUnacknowledgedSize(records.size(), totalSize);
-                    return records;
-                }
-
-                preFetchRef.compareAndSet(preFetch, null);
-            }
-        }
-
         writeLock.lock();
         try {
             doPoll(records, maxResults, expiredRecords);
@@ -705,10 +503,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         if (queueFullAtStart && !expiredRecords.isEmpty()) {
             queueFullRef.set(determineIfFull());
         }
-
-        if (incrementPollCount()) {
-            prefetch();
-        }
     }
 
     /**
@@ -732,6 +526,46 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
         // to disk, because we want them to be swapped back in in the same order that they were swapped out.
 
+        if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that
+        // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the
+        // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot
+        // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived
+        // first.
+        if (!swapLocations.isEmpty()) {
+            final String swapLocation = swapLocations.remove(0);
+            try {
+                final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this);
+                swappedRecordCount -= swappedIn.size();
+                long swapSize = 0L;
+                for (final FlowFileRecord flowFile : swappedIn) {
+                    swapSize += flowFile.getSize();
+                }
+                swappedContentSize -= swapSize;
+                activeQueueContentSize += swapSize;
+                activeQueueSizeRef.set(activeQueue.size());
+                activeQueue.addAll(swappedIn);
+                return;
+            } catch (final FileNotFoundException fnfe) {
+                logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
+                }
+                return;
+            } catch (final IOException ioe) {
+                logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation);
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " +
+                        swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
+                }
+                return;
+            }
+        }
+
         // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
         // of other checks for 99.999% of the cases.
         if (swappedRecordCount == 0 && swapQueue.isEmpty()) {
@@ -760,6 +594,69 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         }
     }
 
+    /**
+     * This method MUST be called with the write lock held
+     */
+    private void writeSwapFilesIfNecessary() {
+        if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
+            return;
+        }
+
+        final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
+
+        // Create a new Priority queue with the prioritizers that are set, but reverse the
+        // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
+        final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities)));
+        tempQueue.addAll(activeQueue);
+        tempQueue.addAll(swapQueue);
+
+        final List<String> swapLocations = new ArrayList<>(numSwapFiles);
+        for (int i = 0; i < numSwapFiles; i++) {
+            // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
+            final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
+            for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
+                toSwap.add(tempQueue.poll());
+            }
+
+            try {
+                Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue.
+                final String swapLocation = swapManager.swapOut(toSwap, this);
+                swapLocations.add(swapLocation);
+            } catch (final IOException ioe) {
+                tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue.
+                logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting "
+                    + "the Java heap space but failed to write information to disk due to {}", getIdentifier(), getQueueSize().getObjectCount(), ioe.toString());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getIdentifier() + " has " + getQueueSize().getObjectCount() +
+                        " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. "
+                        + "See logs for more information.");
+                }
+
+                break;
+            }
+        }
+
+        // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
+        // swap queue. Then add the records back to the active queue.
+        swapQueue.clear();
+        while (tempQueue.size() > swapThreshold) {
+            final FlowFileRecord record = tempQueue.poll();
+            swapQueue.add(record);
+        }
+
+        Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
+
+        // replace the contents of the active queue, since we've merged it with the swap queue.
+        activeQueue.clear();
+        FlowFileRecord toRequeue;
+        while ((toRequeue = tempQueue.poll()) != null) {
+            activeQueue.offer(toRequeue);
+        }
+        this.swapLocations.addAll(swapLocations);
+    }
+
+
     @Override
     public long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords) {
         long drainedSize = 0L;
@@ -796,13 +693,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
             final List<FlowFileRecord> unselected = new ArrayList<>();
 
-            // the prefetch doesn't allow us to add records back. So when this method is used,
-            // if there are prefetched records, we have to requeue them into the active queue first.
-            final PreFetch prefetch = preFetchRef.get();
-            if (prefetch != null) {
-                requeueExpiredPrefetch(prefetch);
-            }
-
             while (true) {
                 FlowFileRecord flowFile = this.activeQueue.poll();
                 if (flowFile == null) {
@@ -856,6 +746,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         }
     }
 
+
+
     private static final class Prioritizer implements Comparator<FlowFileRecord>, Serializable {
 
         private static final long serialVersionUID = 1L;
@@ -991,6 +883,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
 
             this.swappedRecordCount = swapFlowFileCount;
             this.swappedContentSize = swapByteCount;
+            this.swapLocations.addAll(swapLocations);
         } finally {
             writeLock.unlock("Recover Swap Files");
         }
@@ -1004,173 +897,219 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         return "FlowFileQueue[id=" + identifier + "]";
     }
 
-    @Override
-    public DropFlowFileStatus dropFlowFiles() {
-        // TODO Auto-generated method stub
-        return null;
-    }
+    private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
 
     @Override
-    public boolean cancelDropFlowFileRequest(String requestIdentifier) {
-        // TODO Auto-generated method stub
-        return false;
-    }
+    public DropFlowFileStatus dropFlowFiles() {
+        // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
+        if (dropRequestMap.size() > 10) {
+            final List<String> toDrop = new ArrayList<>();
+            for (final Map.Entry<String, DropFlowFileRequest> entry : dropRequestMap.entrySet()) {
+                final DropFlowFileRequest request = entry.getValue();
+                final boolean completed = request.getState() == DropFlowFileState.COMPLETE || request.getState() == DropFlowFileState.FAILURE;
+
+                if (completed && System.currentTimeMillis() - request.getLastUpdated() > TimeUnit.MINUTES.toMillis(5L)) {
+                    toDrop.add(entry.getKey());
+                }
+            }
 
-    @Override
-    public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) {
-        // TODO Auto-generated method stub
-        return null;
-    }
+            for (final String requestId : toDrop) {
+                dropRequestMap.remove(requestId);
+            }
+        }
 
-    /**
-     * Lock the queue so that other threads are unable to interact with the
-     * queue
-     */
-    public void lock() {
-        writeLock.lock();
-    }
+        // TODO: get user name!
+        final String userName = null;
 
-    /**
-     * Unlock the queue
-     */
-    public void unlock() {
-        writeLock.unlock("external unlock");
-    }
+        final String requestIdentifier = UUID.randomUUID().toString();
+        final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                writeLock.lock();
+                try {
+                    dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
+
+                    try {
+                        final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
+                        drop(activeQueueRecords, userName);
+                        activeQueue.clear();
+                        dropRequest.setCurrentSize(getQueueSize());
+
+                        drop(swapQueue, userName);
+                        swapQueue.clear();
+                        dropRequest.setCurrentSize(getQueueSize());
+
+                        final Iterator<String> swapLocationItr = swapLocations.iterator();
+                        while (swapLocationItr.hasNext()) {
+                            final String swapLocation = swapLocationItr.next();
+                            final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
+                            try {
+                                drop(swappedIn, userName);
+                            } catch (final Exception e) {
+                                activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
+                                throw e;
+                            }
+
+                            dropRequest.setCurrentSize(getQueueSize());
+                            swapLocationItr.remove();
+                        }
 
-    @Override
-    public QueueSize getUnacknowledgedQueueSize() {
-        return unacknowledgedSizeRef.get();
-    }
+                        dropRequest.setState(DropFlowFileState.COMPLETE);
+                    } catch (final Exception e) {
+                        // TODO: Handle adequately
+                        dropRequest.setState(DropFlowFileState.FAILURE);
+                    }
+                } finally {
+                    writeLock.unlock("Drop FlowFiles");
+                }
+            }
+        }, "Drop FlowFiles for Connection " + getIdentifier());
+        t.setDaemon(true);
+        t.start();
 
-    private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
-        boolean updated = false;
+        dropRequest.setExecutionThread(t);
+        dropRequestMap.put(requestIdentifier, dropRequest);
 
-        do {
-            final QueueSize queueSize = unacknowledgedSizeRef.get();
-            final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
-            updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
-        } while (!updated);
+        return dropRequest;
     }
 
-    private void requeueExpiredPrefetch(final PreFetch prefetch) {
-        if (prefetch == null) {
-            return;
+    private void drop(final List<FlowFileRecord> flowFiles, final String user) throws IOException {
+        // Create a Provenance Event and a FlowFile Repository record for each FlowFile
+        final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
+        final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
+        for (final FlowFileRecord flowFile : flowFiles) {
+            provenanceEvents.add(createDropEvent(flowFile, user));
+            flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
         }
 
-        writeLock.lock();
-        try {
-            final long contentSizeRequeued = prefetch.requeue(activeQueue);
-            this.activeQueueContentSize += contentSizeRequeued;
-            this.preFetchRef.compareAndSet(prefetch, null);
-        } finally {
-            writeLock.unlock("requeueExpiredPrefetch");
-        }
-    }
+        for (final FlowFileRecord flowFile : flowFiles) {
+            final ContentClaim contentClaim = flowFile.getContentClaim();
+            if (contentClaim == null) {
+                continue;
+            }
 
-    /**
-     * MUST be called with write lock held.
-     */
-    private final AtomicReference<PreFetch> preFetchRef = new AtomicReference<>();
+            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+            if (resourceClaim == null) {
+                continue;
+            }
 
-    private void prefetch() {
-        if (activeQueue.isEmpty()) {
-            return;
+            resourceClaimManager.decrementClaimantCount(resourceClaim);
         }
 
-        final int numToFetch = Math.min(prefetchSize, activeQueue.size());
+        provRepository.registerEvents(provenanceEvents);
+        flowFileRepository.updateRepository(flowFileRepoRecords);
+    }
 
-        final PreFetch curPreFetch = preFetchRef.get();
-        if (curPreFetch != null && curPreFetch.size().getObjectCount() > 0) {
-            return;
-        }
+    private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) {
+        final ProvenanceEventBuilder builder = provRepository.eventBuilder();
+        builder.fromFlowFile(flowFile);
+        builder.setEventType(ProvenanceEventType.DROP);
+        builder.setLineageStartDate(flowFile.getLineageStartDate());
+        builder.setComponentId(getIdentifier());
+        builder.setComponentType("Connection");
+        builder.setDetails("FlowFile manually dropped by user " + user);
+        return builder.build();
+    }
 
-        final List<FlowFileRecord> buffer = new ArrayList<>(numToFetch);
-        long contentSize = 0L;
-        for (int i = 0; i < numToFetch; i++) {
-            final FlowFileRecord record = activeQueue.poll();
-            if (record == null || record.isPenalized()) {
-                // not enough unpenalized records to pull. Put all records back and return
-                activeQueue.addAll(buffer);
-                if (record != null) {
-                    activeQueue.add(record);
-                }
-                return;
-            } else {
-                buffer.add(record);
-                contentSize += record.getSize();
+    private RepositoryRecord createDeleteRepositoryRecord(final FlowFileRecord flowFile) {
+        return new RepositoryRecord() {
+            @Override
+            public FlowFileQueue getDestination() {
+                return null;
             }
-        }
 
-        activeQueueContentSize -= contentSize;
-        preFetchRef.set(new PreFetch(buffer));
-    }
+            @Override
+            public FlowFileQueue getOriginalQueue() {
+                return StandardFlowFileQueue.this;
+            }
 
-    private final TimedBuffer<TimestampedLong> pollCounts = new TimedBuffer<>(TimeUnit.SECONDS, 5, new LongEntityAccess());
+            @Override
+            public RepositoryRecordType getType() {
+                return RepositoryRecordType.DELETE;
+            }
 
-    private boolean incrementPollCount() {
-        pollCounts.add(new TimestampedLong(1L));
-        final long totalCount = pollCounts.getAggregateValue(System.currentTimeMillis() - 5000L).getValue();
-        return totalCount > PREFETCH_POLL_THRESHOLD * 5;
-    }
+            @Override
+            public ContentClaim getCurrentClaim() {
+                return flowFile.getContentClaim();
+            }
 
-    private static class PreFetch {
+            @Override
+            public ContentClaim getOriginalClaim() {
+                return flowFile.getContentClaim();
+            }
 
-        private final List<FlowFileRecord> records;
-        private final AtomicInteger pointer = new AtomicInteger(0);
-        private final long expirationTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1L);
-        private final AtomicLong contentSize = new AtomicLong(0L);
+            @Override
+            public long getCurrentClaimOffset() {
+                return flowFile.getContentClaimOffset();
+            }
 
-        public PreFetch(final List<FlowFileRecord> records) {
-            this.records = records;
+            @Override
+            public FlowFileRecord getCurrent() {
+                return flowFile;
+            }
 
-            long totalSize = 0L;
-            for (final FlowFileRecord record : records) {
-                totalSize += record.getSize();
+            @Override
+            public boolean isAttributesChanged() {
+                return false;
             }
-            contentSize.set(totalSize);
-        }
 
-        public FlowFileRecord nextRecord() {
-            final int nextValue = pointer.getAndIncrement();
-            if (nextValue >= records.size()) {
+            @Override
+            public boolean isMarkedForAbort() {
+                return false;
+            }
+
+            @Override
+            public String getSwapLocation() {
                 return null;
             }
+        };
+    }
 
-            final FlowFileRecord flowFile = records.get(nextValue);
-            contentSize.addAndGet(-flowFile.getSize());
-            return flowFile;
+
+    @Override
+    public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
+        final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
+        if (request == null) {
+            return false;
         }
 
-        public QueueSize size() {
-            final int pointerIndex = pointer.get();
-            final int count = records.size() - pointerIndex;
-            if (count < 0) {
-                return new QueueSize(0, 0L);
-            }
+        final boolean successful = request.cancel();
+        return successful;
+    }
 
-            final long bytes = contentSize.get();
-            return new QueueSize(count, bytes);
-        }
+    @Override
+    public DropFlowFileStatus getDropFlowFileStatus(final String requestIdentifier) {
+        return dropRequestMap.get(requestIdentifier);
+    }
 
-        public boolean isExpired() {
-            return System.nanoTime() > expirationTime;
-        }
+    /**
+     * Lock the queue so that other threads are unable to interact with the
+     * queue
+     */
+    public void lock() {
+        writeLock.lock();
+    }
 
-        private long requeue(final Queue<FlowFileRecord> queue) {
-            // get the current pointer and prevent any other thread from accessing the rest of the elements
-            final int curPointer = pointer.getAndAdd(records.size());
-            if (curPointer < records.size() - 1) {
-                final List<FlowFileRecord> subList = records.subList(curPointer, records.size());
-                long contentSize = 0L;
-                for (final FlowFileRecord record : subList) {
-                    contentSize += record.getSize();
-                }
+    /**
+     * Unlock the queue
+     */
+    public void unlock() {
+        writeLock.unlock("external unlock");
+    }
 
-                queue.addAll(subList);
+    @Override
+    public QueueSize getUnacknowledgedQueueSize() {
+        return unacknowledgedSizeRef.get();
+    }
 
-                return contentSize;
-            }
-            return 0L;
-        }
+    private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
+        boolean updated = false;
+
+        do {
+            final QueueSize queueSize = unacknowledgedSizeRef.get();
+            final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize);
+            updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize);
+        } while (!updated);
     }
 }


[2/5] nifi git commit: NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue

Posted by ma...@apache.org.
NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b8c51dc3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b8c51dc3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b8c51dc3

Branch: refs/heads/NIFI-730
Commit: b8c51dc35d1a7fdbf3e6449bbe297db667a1176c
Parents: b4bfcc1
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 11 10:27:07 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 11 10:27:07 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowFileQueue.java   | 203 ------
 .../controller/queue/DropFlowFileState.java     |  40 ++
 .../controller/queue/DropFlowFileStatus.java    |  62 ++
 .../nifi/controller/queue/FlowFileQueue.java    | 256 ++++++++
 .../apache/nifi/controller/queue/QueueSize.java |  48 ++
 .../repository/FlowFileRepository.java          |   2 +-
 .../repository/FlowFileSwapManager.java         |  94 ++-
 .../controller/repository/QueueProvider.java    |   2 +-
 .../controller/repository/RepositoryRecord.java |   2 +-
 .../SwapManagerInitializationContext.java       |  41 ++
 .../apache/nifi/processor/ProcessSession.java   |   1 +
 .../org/apache/nifi/processor/QueueSize.java    |  48 --
 .../org/apache/nifi/connectable/Connection.java |   2 +-
 .../nifi/controller/StandardFlowFileQueue.java  | 108 +++-
 .../nifi/connectable/StandardConnection.java    |  26 +-
 .../nifi/controller/FileSystemSwapManager.java  | 626 ++++++++-----------
 .../apache/nifi/controller/FlowController.java  | 193 +++---
 .../repository/BatchingSessionFactory.java      |   2 +-
 .../repository/ConnectionSwapInfo.java          |  58 --
 .../repository/StandardProcessSession.java      |   4 +-
 .../repository/StandardRepositoryRecord.java    |   2 +-
 .../repository/VolatileFlowFileRepository.java  |   2 +-
 .../WriteAheadFlowFileRepository.java           |   2 +-
 .../controller/TestFileSystemSwapManager.java   |   6 +-
 .../repository/TestStandardProcessSession.java  |   9 +-
 .../TestWriteAheadFlowFileRepository.java       |   2 +-
 26 files changed, 1003 insertions(+), 838 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
deleted file mode 100644
index e1baeb7..0000000
--- a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.QueueSize;
-
-public interface FlowFileQueue {
-
-    /**
-     * @return the unique identifier for this FlowFileQueue
-     */
-    String getIdentifier();
-
-    /**
-     * @return list of processing priorities for this queue
-     */
-    List<FlowFilePrioritizer> getPriorities();
-
-    /**
-     * @return the minimum number of FlowFiles that must be present in order for
-     * FlowFiles to begin being swapped out of the queue
-     */
-    int getSwapThreshold();
-
-    /**
-     * Resets the comparator used by this queue to maintain order.
-     *
-     * @param newPriorities the ordered list of prioritizers to use to determine
-     * order within this queue.
-     * @throws NullPointerException if arg is null
-     */
-    void setPriorities(List<FlowFilePrioritizer> newPriorities);
-
-    /**
-     * Establishes this queue's preferred maximum work load.
-     *
-     * @param maxQueueSize the maximum number of flow files this processor
-     * recommends having in its work queue at any one time
-     */
-    void setBackPressureObjectThreshold(long maxQueueSize);
-
-    /**
-     * @return maximum number of flow files that should be queued up at any one
-     * time
-     */
-    long getBackPressureObjectThreshold();
-
-    /**
-     * @param maxDataSize Establishes this queue's preferred maximum data size.
-     */
-    void setBackPressureDataSizeThreshold(String maxDataSize);
-
-    /**
-     * @return maximum data size that should be queued up at any one time
-     */
-    String getBackPressureDataSizeThreshold();
-
-    QueueSize size();
-
-    /**
-     * @return total size in bytes of the queue flow file's content
-     */
-    long contentSize();
-
-    /**
-     * @return true if no items queue; false otherwise
-     */
-    boolean isEmpty();
-
-    /**
-     * @return true if the active queue is empty; false otherwise. The Active
-     * queue contains those FlowFiles that can be processed immediately and does
-     * not include those FlowFiles that have been swapped out or are currently
-     * being processed
-     */
-    boolean isActiveQueueEmpty();
-
-    QueueSize getActiveQueueSize();
-
-    /**
-     * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
-     * is considered to be unacknowledged if it has been pulled from the queue by some component
-     * but the session that pulled the FlowFile has not yet been committed or rolled back.
-     *
-     * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
-     */
-    QueueSize getUnacknowledgedQueueSize();
-
-    void acknowledge(FlowFileRecord flowFile);
-
-    void acknowledge(Collection<FlowFileRecord> flowFiles);
-
-    /**
-     * @return true if maximum queue size has been reached or exceeded; false
-     * otherwise
-     */
-    boolean isFull();
-
-    /**
-     * places the given file into the queue
-     *
-     * @param file to place into queue
-     */
-    void put(FlowFileRecord file);
-
-    /**
-     * places the given files into the queue
-     *
-     * @param files to place into queue
-     */
-    void putAll(Collection<FlowFileRecord> files);
-
-    /**
-     * Removes all records from the internal swap queue and returns them.
-     *
-     * @return all removed records from internal swap queue
-     */
-    List<FlowFileRecord> pollSwappableRecords();
-
-    /**
-     * Restores the records from swap space into this queue, adding the records
-     * that have expired to the given set instead of enqueuing them.
-     *
-     * @param records that were swapped in
-     */
-    void putSwappedRecords(Collection<FlowFileRecord> records);
-
-    /**
-     * Updates the internal counters of how much data is queued, based on
-     * swapped data that is being restored.
-     *
-     * @param numRecords count of records swapped in
-     * @param contentSize total size of records being swapped in
-     */
-    void incrementSwapCount(int numRecords, long contentSize);
-
-    /**
-     * @return the number of FlowFiles that are enqueued and not swapped
-     */
-    int unswappedSize();
-
-    int getSwapRecordCount();
-
-    int getSwapQueueSize();
-
-    /**
-     * @param expiredRecords expired records
-     * @return the next flow file on the queue; null if empty
-     */
-    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
-
-    /**
-     * @param maxResults limits how many results can be polled
-     * @param expiredRecords for expired records
-     * @return the next flow files on the queue up to the max results; null if
-     * empty
-     */
-    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
-
-    /**
-     * Drains flow files from the given source queue into the given destination
-     * list.
-     *
-     * @param sourceQueue queue to drain from
-     * @param destination Collection to drain to
-     * @param maxResults max number to drain
-     * @param expiredRecords for expired records
-     * @return size (bytes) of flow files drained from queue
-     */
-    long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
-
-    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
-
-    String getFlowFileExpiration();
-
-    int getFlowFileExpiration(TimeUnit timeUnit);
-
-    void setFlowFileExpiration(String flowExpirationPeriod);
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
new file mode 100644
index 0000000..3f16d00
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the state that a Drop FlowFile request is in
+ */
+public enum DropFlowFileState {
+
+    WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"),
+    DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"),
+    COMPLETE("Completed Successfully"),
+    FAILURE("Failed");
+    
+    private final String description;
+    
+    private DropFlowFileState(final String description) {
+        this.description = description;
+    }
+    
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
new file mode 100644
index 0000000..b216608
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the status of a Drop FlowFile Request that has been issued to
+ * a {@link FlowFileQueue}. When a queue is requested to drop its FlowFiles,
+ * that process may be rather lengthy in the case of a poorly behaving
+ * FlowFileRepository or if the destination Processor is polling from the
+ * queue using a filter that is misbehaving. As a result, the dropping of
+ * FlowFiles is performed asynchronously.
+ *
+ * This status object provides information about how far along in the process
+ * we currently are and information about the success or failure of the
+ * operation.
+ */
+public interface DropFlowFileStatus {
+
+    /**
+     * @return the identifier of the request to drop FlowFiles from the queue
+     */
+    String getRequestIdentifier();
+
+    /**
+     * @return the date/time (in milliseconds since epoch) at which the request to
+     *         drop the FlowFiles from a queue was submitted
+     */
+    long getRequestSubmissionTime();
+
+    /**
+     * @return the size of the queue when the drop request was issued or <code>null</code> if
+     *         it is not yet known, which can happen if the {@link DropFlowFileState} is
+     *         {@link DropFlowFileState#WAITING_FOR_LOCK}.
+     */
+    QueueSize getOriginalSize();
+
+    /**
+     * @return the current size of the queue or <code>null</code> if it is not yet known
+     */
+    QueueSize getCurrentSize();
+
+    /**
+     * @return the current state of the operation
+     */
+    DropFlowFileState getState();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
new file mode 100644
index 0000000..31f17e0
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public interface FlowFileQueue {
+
+    /**
+     * @return the unique identifier for this FlowFileQueue
+     */
+    String getIdentifier();
+
+    /**
+     * @return list of processing priorities for this queue
+     */
+    List<FlowFilePrioritizer> getPriorities();
+
+    /**
+     * Reads any Swap Files that belong to this queue and increments counts so that the size
+     * of the queue will reflect the size of all FlowFiles regardless of whether or not they are
+     * swapped out. This will be called only during NiFi startup as an initialization step. This
+     * method is then responsible for returning the largest ID of any FlowFile that is swapped
+     * out, or <code>null</code> if no FlowFiles are swapped out for this queue.
+     *
+     * @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if
+     *         no FlowFiles are swapped out for this queue.
+     */
+    Long recoverSwappedFlowFiles();
+
+    /**
+     * Destroys any Swap Files that exist for this queue without updating the FlowFile Repository
+     * or Provenance Repository. This is done only on startup in the case of non-persistent
+     * repositories. In the case of non-persistent repositories, we may still have Swap Files because
+     * we may still need to overflow the FlowFiles from heap onto disk, even though we don't want to keep
+     * the FlowFiles on restart.
+     */
+    void purgeSwapFiles();
+
+    /**
+     * @return the minimum number of FlowFiles that must be present in order for
+     *         FlowFiles to begin being swapped out of the queue
+     */
+    // TODO: REMOVE THIS.
+    int getSwapThreshold();
+
+    /**
+     * Resets the comparator used by this queue to maintain order.
+     *
+     * @param newPriorities the ordered list of prioritizers to use to determine
+     *            order within this queue.
+     * @throws NullPointerException if arg is null
+     */
+    void setPriorities(List<FlowFilePrioritizer> newPriorities);
+
+    /**
+     * Establishes this queue's preferred maximum work load.
+     *
+     * @param maxQueueSize the maximum number of flow files this processor
+     *            recommends having in its work queue at any one time
+     */
+    void setBackPressureObjectThreshold(long maxQueueSize);
+
+    /**
+     * @return maximum number of flow files that should be queued up at any one
+     *         time
+     */
+    long getBackPressureObjectThreshold();
+
+    /**
+     * @param maxDataSize Establishes this queue's preferred maximum data size.
+     */
+    void setBackPressureDataSizeThreshold(String maxDataSize);
+
+    /**
+     * @return maximum data size that should be queued up at any one time
+     */
+    String getBackPressureDataSizeThreshold();
+
+    QueueSize size();
+
+    /**
+     * @return true if no items queue; false otherwise
+     */
+    boolean isEmpty();
+
+    /**
+     * @return true if the active queue is empty; false otherwise. The Active
+     *         queue contains those FlowFiles that can be processed immediately and does
+     *         not include those FlowFiles that have been swapped out or are currently
+     *         being processed
+     */
+    // TODO: REMOVE?
+    boolean isActiveQueueEmpty();
+
+    // TODO: REMOVE?
+    QueueSize getActiveQueueSize();
+
+    /**
+     * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
+     * is considered to be unacknowledged if it has been pulled from the queue by some component
+     * but the session that pulled the FlowFile has not yet been committed or rolled back.
+     *
+     * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
+     */
+    QueueSize getUnacknowledgedQueueSize();
+
+    void acknowledge(FlowFileRecord flowFile);
+
+    void acknowledge(Collection<FlowFileRecord> flowFiles);
+
+    /**
+     * @return true if maximum queue size has been reached or exceeded; false
+     *         otherwise
+     */
+    boolean isFull();
+
+    /**
+     * places the given file into the queue
+     *
+     * @param file to place into queue
+     */
+    void put(FlowFileRecord file);
+
+    /**
+     * places the given files into the queue
+     *
+     * @param files to place into queue
+     */
+    void putAll(Collection<FlowFileRecord> files);
+
+    /**
+     * Removes all records from the internal swap queue and returns them.
+     *
+     * @return all removed records from internal swap queue
+     */
+    // TODO: REMOVE THIS?
+    List<FlowFileRecord> pollSwappableRecords();
+
+    /**
+     * Restores the records from swap space into this queue, adding the records
+     * that have expired to the given set instead of enqueuing them.
+     *
+     * @param records that were swapped in
+     */
+    // TODO: REMOVE THIS?
+    void putSwappedRecords(Collection<FlowFileRecord> records);
+
+    /**
+     * Updates the internal counters of how much data is queued, based on
+     * swapped data that is being restored.
+     *
+     * @param numRecords count of records swapped in
+     * @param contentSize total size of records being swapped in
+     */
+    // TODO: REMOVE THIS?
+    void incrementSwapCount(int numRecords, long contentSize);
+
+    /**
+     * @return the number of FlowFiles that are enqueued and not swapped
+     */
+    // TODO: REMOVE THIS?
+    int unswappedSize();
+
+    // TODO: REMOVE THIS?
+    int getSwapRecordCount();
+
+    // TODO: REMOVE THIS?
+    int getSwapQueueSize();
+
+    /**
+     * @param expiredRecords expired records
+     * @return the next flow file on the queue; null if empty
+     */
+    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
+
+    /**
+     * @param maxResults limits how many results can be polled
+     * @param expiredRecords for expired records
+     * @return the next flow files on the queue up to the max results; null if
+     *         empty
+     */
+    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
+
+    /**
+     * Drains flow files from the given source queue into the given destination
+     * list.
+     *
+     * @param sourceQueue queue to drain from
+     * @param destination Collection to drain to
+     * @param maxResults max number to drain
+     * @param expiredRecords for expired records
+     * @return size (bytes) of flow files drained from queue
+     */
+    long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
+
+    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
+
+    String getFlowFileExpiration();
+
+    int getFlowFileExpiration(TimeUnit timeUnit);
+
+    void setFlowFileExpiration(String flowExpirationPeriod);
+
+    /**
+     * Initiates a request to drop all FlowFiles in this queue. This method returns
+     * a DropFlowFileStatus that can be used to determine the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+     * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @return the status of the drop request.
+     */
+    DropFlowFileStatus dropFlowFiles();
+
+    /**
+     * Returns the current status of a Drop FlowFile Request that was initiated via the
+     * {@link #dropFlowFiles()} method that has the given identifier
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @return the status for the request with the given identifier, or <code>null</code> if no
+     *         request status exists with that identifier
+     */
+    DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier);
+
+    /**
+     * Cancels the request to drop FlowFiles that has the given identifier
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @return <code>true</code> if the request was canceled, <code>false</code> if the request has
+     *         already completed or is not known
+     */
+    boolean cancelDropFlowFileRequest(String requestIdentifier);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
new file mode 100644
index 0000000..42d8416
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+/**
+ *
+ */
+public class QueueSize {
+
+    private final int objectCount;
+    private final long totalSizeBytes;
+
+    public QueueSize(final int numberObjects, final long totalSizeBytes) {
+        if (numberObjects < 0 || totalSizeBytes < 0) {
+            throw new IllegalArgumentException();
+        }
+        objectCount = numberObjects;
+        this.totalSizeBytes = totalSizeBytes;
+    }
+
+    /**
+     * @return number of objects present on the queue
+     */
+    public int getObjectCount() {
+        return objectCount;
+    }
+
+    /**
+     * @return total size in bytes of the content for the data on the queue
+     */
+    public long getByteCount() {
+        return totalSizeBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 58fc6b3..906cbe2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 2e5be11..57e9186 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 
 /**
  * Defines a mechanism by which FlowFiles can be move into external storage or
@@ -26,38 +29,81 @@ import org.apache.nifi.events.EventReporter;
 public interface FlowFileSwapManager {
 
     /**
-     * Starts the Manager's background threads to start swapping FlowFiles in
-     * and out of memory
+     * Initializes the Swap Manager, providing a {@link SwapManagerInitializationContext} so that the
+     * Swap Manager has access to all of the components necessary to perform its functions
      *
-     * @param flowFileRepository the FlowFileRepository that must be notified of
-     * any swapping in or out of FlowFiles
-     * @param queueProvider the provider of FlowFileQueue's so that FlowFiles
-     * can be obtained and restored
-     * @param claimManager the ContentClaimManager to use for interacting with
-     * Content Claims
-     * @param reporter the EventReporter that can be used for notifying users of
-     * important events
+     * @param initializationContext the context the provides the swap manager with access to the
+     *            resources that it needs to perform its functions
      */
-    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
+    void initialize(SwapManagerInitializationContext initializationContext);
 
     /**
-     * Shuts down the manager
+     * Swaps out the given FlowFiles that belong to the queue with the given identifier.
+     *
+     * @param flowFiles the FlowFiles to swap out to external storage
+     * @param flowFileQueue the queue that the FlowFiles belong to
+     * @return the location of the externally stored swap file
+     *
+     * @throws IOException if unable to swap the FlowFiles out
      */
-    void shutdown();
+    String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException;
 
     /**
-     * Removes all Swap information, permanently destroying any FlowFiles that
-     * have been swapped out
+     * Recovers the SwapFiles from the swap file that lives at the given location. This action
+     * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
+     * at the given location remains in that location and the FlowFile Repository is not updated.
+     *
+     * @param swapLocation the location of hte swap file
+     * @param flowFileQueue the queue that the FlowFiles belong to
+     * @return the FlowFiles that live at the given swap location
+     *
+     * @throws IOException if unable to recover the FlowFiles from the given location
      */
-    void purge();
+    List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException;
 
     /**
-     * Notifies FlowFile queues of the number of FlowFiles and content size of
-     * all FlowFiles that are currently swapped out
+     * Recovers the FlowFiles from the swap file that lives at the given location and belongs
+     * to the FlowFile Queue with the given identifier. The FlowFile Repository is then updated
+     * and the swap file is permanently removed from the external storage
+     *
+     * @param swapLocation the location of the swap file
+     * @param flowFileQueue the queue to which the FlowFiles belong
+     *
+     * @return the FlowFiles that are stored in the given location
+     *
+     * @throws IOException if unable to recover the FlowFiles from the given location or update the
+     *             FlowFileRepository
+     */
+    List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException;
+
+    /**
+     * Determines swap files that exist for the given FlowFileQueue
+     *
+     * @param flowFileQueue the queue for which the FlowFiles should be recovered
      *
-     * @param connectionProvider provider
-     * @param claimManager manager
-     * @return how many flowfiles have been recovered
+     * @return all swap locations that have been identified for the given queue, in the order that they should
+     *         be swapped back in
      */
-    long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
+    List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
+
+    /**
+     * Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location
+     *
+     * @param swapLocation the location of the swap file
+     * @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out
+     */
+    QueueSize getSwapSize(String swapLocation) throws IOException;
+
+    /**
+     * Returns the maximum record id of the FlowFiles stored at the given swap location
+     *
+     * @param swapLocation the swap location to read id's from
+     * @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found
+     */
+    Long getMaxRecordId(String swapLocation) throws IOException;
+
+    /**
+     * Purge all known Swap Files without updating FlowFileRepository or Provenance Repository
+     */
+    void purge();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
index fcb516d..95d9f2e 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
@@ -18,7 +18,7 @@ package org.apache.nifi.controller.repository;
 
 import java.util.Collection;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 
 /**
  * Provides a collection of <code>FlowFileQueue</code>s that represents all

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
index 40d44a8..09202c0 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
new file mode 100644
index 0000000..564d5ec
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+
+public interface SwapManagerInitializationContext {
+
+    /**
+     * @return the {@link FlowFileRepository} that should be updated when FlowFiles are swapped in and out
+     */
+    FlowFileRepository getFlowFileRepository();
+
+
+    /**
+     * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when
+     *         performing swapping actions
+     */
+    ResourceClaimManager getResourceClaimManager();
+
+    /**
+     * @return an {@link EventReporter} that can be used to report events to users
+     */
+    EventReporter getEventReporter();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index ed46d68..ebd56a9 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
deleted file mode 100644
index c3c2ccc..0000000
--- a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor;
-
-/**
- *
- */
-public class QueueSize {
-
-    private final int objectCount;
-    private final long totalSizeBytes;
-
-    public QueueSize(final int numberObjects, final long totalSizeBytes) {
-        if (numberObjects < 0 || totalSizeBytes < 0) {
-            throw new IllegalArgumentException();
-        }
-        objectCount = numberObjects;
-        this.totalSizeBytes = totalSizeBytes;
-    }
-
-    /**
-     * @return number of objects present on the queue
-     */
-    public int getObjectCount() {
-        return objectCount;
-    }
-
-    /**
-     * @return total size in bytes of the content for the data on the queue
-     */
-    public long getByteCount() {
-        return totalSizeBytes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
index 0a0089d..2e66905 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
@@ -20,7 +20,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index f47ea2f..df356fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,21 +35,25 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
-import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
 import org.apache.nifi.util.timebuffer.LongEntityAccess;
 import org.apache.nifi.util.timebuffer.TimedBuffer;
 import org.apache.nifi.util.timebuffer.TimestampedLong;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,12 +89,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private boolean swapMode = false;
     private long maximumQueueObjectCount;
 
+    private final EventReporter eventReporter;
     private final AtomicLong flowFileExpirationMillis;
     private final Connection connection;
     private final AtomicReference<String> flowFileExpirationPeriod;
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     private final List<FlowFilePrioritizer> priorities;
     private final int swapThreshold;
+    private final FlowFileSwapManager swapManager;
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
@@ -101,7 +108,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
-    public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) {
+    public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
+        final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         maximumQueueObjectCount = 0L;
@@ -110,6 +118,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         flowFileExpirationMillis = new AtomicLong(0);
         flowFileExpirationPeriod = new AtomicReference<>("0 mins");
         swapQueue = new ArrayList<>();
+        this.eventReporter = eventReporter;
+        this.swapManager = swapManager;
 
         this.identifier = identifier;
         this.swapThreshold = swapThreshold;
@@ -233,21 +243,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public long contentSize() {
-        readLock.lock();
-        try {
-            final PreFetch prefetch = preFetchRef.get();
-            if (prefetch == null) {
-                return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount();
-            } else {
-                return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount();
-            }
-        } finally {
-            readLock.unlock("getContentSize");
-        }
-    }
-
-    @Override
     public boolean isEmpty() {
         readLock.lock();
         try {
@@ -945,11 +940,88 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         this.flowFileExpirationMillis.set(millis);
     }
 
+
+    @Override
+    public void purgeSwapFiles() {
+        swapManager.purge();
+    }
+
+    @Override
+    public Long recoverSwappedFlowFiles() {
+        int swapFlowFileCount = 0;
+        long swapByteCount = 0L;
+        Long maxId = null;
+
+        writeLock.lock();
+        try {
+            final List<String> swapLocations;
+            try {
+                swapLocations = swapManager.recoverSwapLocations(this);
+            } catch (final IOException ioe) {
+                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
+                        getIdentifier() + "; see logs for more detials");
+                }
+                return null;
+            }
+
+            for (final String swapLocation : swapLocations) {
+                try {
+                    final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
+                    final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
+                    if (maxSwapRecordId != null) {
+                        if (maxId == null || maxSwapRecordId > maxId) {
+                            maxId = maxSwapRecordId;
+                        }
+                    }
+
+                    swapFlowFileCount += queueSize.getObjectCount();
+                    swapByteCount += queueSize.getByteCount();
+                } catch (final IOException ioe) {
+                    logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
+                    logger.error("", ioe);
+                    if (eventReporter != null) {
+                        eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
+                            "; the file appears to be corrupt. See logs for more details");
+                    }
+                }
+            }
+
+            this.swappedRecordCount = swapFlowFileCount;
+            this.swappedContentSize = swapByteCount;
+        } finally {
+            writeLock.unlock("Recover Swap Files");
+        }
+
+        return maxId;
+    }
+
+
     @Override
     public String toString() {
         return "FlowFileQueue[id=" + identifier + "]";
     }
 
+    @Override
+    public DropFlowFileStatus dropFlowFiles() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean cancelDropFlowFileRequest(String requestIdentifier) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
     /**
      * Lock the queue so that other threads are unable to interact with the
      * queue

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index ad556e2..f0a6d8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -26,18 +26,19 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.util.NiFiProperties;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
 /**
  * Models a connection between connectable components. A connection may contain one or more relationships that map the source component to the destination component.
  */
@@ -65,7 +66,7 @@ public final class StandardConnection implements Connection {
         destination = new AtomicReference<>(builder.destination);
         relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
-        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold());
+        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -259,6 +260,8 @@ public final class StandardConnection implements Connection {
         private Connectable source;
         private Connectable destination;
         private Collection<Relationship> relationships;
+        private FlowFileSwapManager swapManager;
+        private EventReporter eventReporter;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -305,6 +308,16 @@ public final class StandardConnection implements Connection {
             return this;
         }
 
+        public Builder swapManager(final FlowFileSwapManager swapManager) {
+            this.swapManager = swapManager;
+            return this;
+        }
+
+        public Builder eventReporter(final EventReporter eventReporter) {
+            this.eventReporter = eventReporter;
+            return this;
+        }
+
         public StandardConnection build() {
             if (source == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Source");
@@ -312,6 +325,9 @@ public final class StandardConnection implements Connection {
             if (destination == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Destination");
             }
+            if (swapManager == null) {
+                throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
+            }
 
             if (relationships == null) {
                 relationships = new ArrayList<>();


[5/5] nifi git commit: Merge branch 'NIFI-730' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-730

Posted by ma...@apache.org.
Merge branch 'NIFI-730' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-730


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9be37914
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9be37914
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9be37914

Branch: refs/heads/NIFI-730
Commit: 9be37914ddb9c8c017cc4d6b3209340a19a7cb8d
Parents: 49a781d e0ac7cd
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 13 10:03:31 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 13 10:03:31 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/web/api/dto/DropRequestDTO.java | 129 +++++++++++
 .../nifi/web/api/entity/DropRequestEntity.java  |  44 ++++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  25 +++
 .../nifi/web/StandardNiFiServiceFacade.java     |  24 ++
 .../apache/nifi/web/api/ConnectionResource.java | 217 ++++++++++++++++++-
 .../org/apache/nifi/web/dao/ConnectionDAO.java  |  22 ++
 .../web/dao/impl/StandardConnectionDAO.java     |  15 ++
 .../src/main/webapp/js/nf/canvas/nf-actions.js  |  58 +++++
 .../main/webapp/js/nf/canvas/nf-context-menu.js |  10 +
 9 files changed, 542 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[3/5] nifi git commit: NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather than in a background thread

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
new file mode 100644
index 0000000..66f32d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardFlowFileQueue {
+    private TestSwapManager swapManager = null;
+    private StandardFlowFileQueue queue = null;
+
+    @Before
+    public void setup() {
+        final Connection connection = Mockito.mock(Connection.class);
+        Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
+        Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        swapManager = new TestSwapManager();
+
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
+        final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
+
+        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
+        TestFlowFile.idGenerator.set(0L);
+    }
+
+
+    @Test
+    public void testSwapOutOccurs() {
+        for (int i = 0; i < 10000; i++) {
+            queue.put(new TestFlowFile());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 1, queue.size().getObjectCount());
+            assertEquals(i + 1, queue.size().getByteCount());
+        }
+
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new TestFlowFile());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 10001, queue.size().getObjectCount());
+            assertEquals(i + 10001, queue.size().getByteCount());
+        }
+
+        queue.put(new TestFlowFile(1000));
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(20999, queue.size().getByteCount());
+
+        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testLowestPrioritySwappedOutFirst() {
+        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+        prioritizers.add(new FlowFileSizePrioritizer());
+        queue.setPriorities(prioritizers);
+
+        long maxSize = 20000;
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile(maxSize - i));
+        }
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+
+        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
+        assertEquals(10000, flowFiles.size());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, flowFiles.get(i).getSize());
+        }
+    }
+
+    @Test
+    public void testSwapIn() {
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        assertEquals(1, swapManager.swappedOut.size());
+        queue.put(new TestFlowFile());
+        assertEquals(1, swapManager.swappedOut.size());
+
+        final Set<FlowFileRecord> exp = new HashSet<>();
+        for (int i = 0; i < 9999; i++) {
+            assertNotNull(queue.poll(exp));
+        }
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+        assertNotNull(queue.poll(exp));
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+
+        assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
+        assertEquals(1, swapManager.swapInCalledCount);
+        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+
+        queue.poll(exp);
+
+    }
+
+
+    private class TestSwapManager implements FlowFileSwapManager {
+        private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
+        int swapOutCalledCount = 0;
+        int swapInCalledCount = 0;
+
+
+        @Override
+        public void initialize(final SwapManagerInitializationContext initializationContext) {
+
+        }
+
+        @Override
+        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
+            swapOutCalledCount++;
+            final String location = UUID.randomUUID().toString();
+            swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
+            return location;
+        }
+
+        @Override
+        public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
+            return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
+        }
+
+        @Override
+        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+            swapInCalledCount++;
+            return swappedOut.remove(swapLocation);
+        }
+
+        @Override
+        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
+            return new ArrayList<String>(swappedOut.keySet());
+        }
+
+        @Override
+        public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) {
+
+        }
+
+        @Override
+        public QueueSize getSwapSize(String swapLocation) throws IOException {
+            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+            if (flowFiles == null) {
+                return new QueueSize(0, 0L);
+            }
+
+            int count = 0;
+            long size = 0L;
+            for (final FlowFileRecord flowFile : flowFiles) {
+                count++;
+                size += flowFile.getSize();
+            }
+
+            return new QueueSize(count, size);
+        }
+
+        @Override
+        public Long getMaxRecordId(String swapLocation) throws IOException {
+            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+            if (flowFiles == null) {
+                return null;
+            }
+
+            Long max = null;
+            for (final FlowFileRecord flowFile : flowFiles) {
+                if (max == null || flowFile.getId() > max) {
+                    max = flowFile.getId();
+                }
+            }
+
+            return max;
+        }
+
+        @Override
+        public void purge() {
+            swappedOut.clear();
+        }
+    }
+
+
+    private static class TestFlowFile implements FlowFileRecord {
+        private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+        private final long id = idGenerator.getAndIncrement();
+        private final long entryDate = System.currentTimeMillis();
+        private final Map<String, String> attributes;
+        private final long size;
+
+        public TestFlowFile() {
+            this(1L);
+        }
+
+        public TestFlowFile(final long size) {
+            this(new HashMap<String, String>(), size);
+        }
+
+        public TestFlowFile(final Map<String, String> attributes, final long size) {
+            this.attributes = attributes;
+            this.size = size;
+        }
+
+
+        @Override
+        public long getId() {
+            return id;
+        }
+
+        @Override
+        public long getEntryDate() {
+            return entryDate;
+        }
+
+        @Override
+        public long getLineageStartDate() {
+            return entryDate;
+        }
+
+        @Override
+        public Long getLastQueueDate() {
+            return null;
+        }
+
+        @Override
+        public Set<String> getLineageIdentifiers() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public boolean isPenalized() {
+            return false;
+        }
+
+        @Override
+        public String getAttribute(String key) {
+            return attributes.get(key);
+        }
+
+        @Override
+        public long getSize() {
+            return size;
+        }
+
+        @Override
+        public Map<String, String> getAttributes() {
+            return Collections.unmodifiableMap(attributes);
+        }
+
+        @Override
+        public int compareTo(final FlowFile o) {
+            return Long.compare(id, o.getId());
+        }
+
+        @Override
+        public long getPenaltyExpirationMillis() {
+            return 0;
+        }
+
+        @Override
+        public ContentClaim getContentClaim() {
+            return null;
+        }
+
+        @Override
+        public long getContentClaimOffset() {
+            return 0;
+        }
+    }
+
+    private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
+        @Override
+        public int compare(final FlowFile o1, final FlowFile o2) {
+            return Long.compare(o1.getSize(), o2.getSize());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index f0a6d8a..d43a3db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -32,11 +32,14 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.util.NiFiProperties;
 
 /**
@@ -66,7 +69,8 @@ public final class StandardConnection implements Connection {
         destination = new AtomicReference<>(builder.destination);
         relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
-        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
+        flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
+            scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -262,6 +266,9 @@ public final class StandardConnection implements Connection {
         private Collection<Relationship> relationships;
         private FlowFileSwapManager swapManager;
         private EventReporter eventReporter;
+        private FlowFileRepository flowFileRepository;
+        private ProvenanceEventRepository provenanceRepository;
+        private ResourceClaimManager resourceClaimManager;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -318,6 +325,21 @@ public final class StandardConnection implements Connection {
             return this;
         }
 
+        public Builder flowFileRepository(final FlowFileRepository flowFileRepository) {
+            this.flowFileRepository = flowFileRepository;
+            return this;
+        }
+
+        public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) {
+            this.provenanceRepository = provenanceRepository;
+            return this;
+        }
+
+        public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) {
+            this.resourceClaimManager = resourceClaimManager;
+            return this;
+        }
+
         public StandardConnection build() {
             if (source == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Source");
@@ -328,6 +350,15 @@ public final class StandardConnection implements Connection {
             if (swapManager == null) {
                 throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
             }
+            if (flowFileRepository == null) {
+                throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository");
+            }
+            if (provenanceRepository == null) {
+                throw new IllegalStateException("Cannot build a Connection without a Provenance Repository");
+            }
+            if (resourceClaimManager == null) {
+                throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager");
+            }
 
             if (relationships == null) {
                 relationships = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 7ab56ed..c4a86f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -28,6 +28,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -79,7 +80,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private EventReporter eventReporter;
     private ResourceClaimManager claimManager;
 
-
     public FileSystemSwapManager() {
         final NiFiProperties properties = NiFiProperties.getInstance();
         final Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
@@ -111,6 +111,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
             serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
             fos.getFD().sync();
+        } catch (final IOException ioe) {
+            // we failed to write out the entire swap file. Delete the temporary file, if we can.
+            swapTempFile.delete();
+            throw ioe;
         }
 
         if (swapTempFile.renameTo(swapFile)) {
@@ -133,25 +137,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
         }
 
-        // TODO: When FlowFile Queue performs this operation, it needs to take the following error handling logic into account:
-
-        /*
-         * } catch (final EOFException eof) {
-         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
-         *
-         * if (!swapFile.delete()) {
-         * warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
-         * }
-         * } catch (final FileNotFoundException fnfe) {
-         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
-         * } catch (final Exception e) {
-         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
-         *
-         * if (swapFile != null) {
-         * queue.add(swapFile);
-         * }
-         * }
-         */
         return swappedFlowFiles;
     }
 
@@ -165,7 +150,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         final List<FlowFileRecord> swappedFlowFiles;
         try (final InputStream fis = new FileInputStream(swapFile);
             final DataInputStream in = new DataInputStream(fis)) {
-            swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, swapLocation, claimManager);
+            swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
         }
 
         return swappedFlowFiles;
@@ -189,6 +174,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     @Override
+    public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException {
+
+    }
+
+
+    @Override
     public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
@@ -322,7 +313,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
 
-    public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
+    public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
         if (toSwap == null || toSwap.isEmpty()) {
             return 0;
         }
@@ -396,8 +387,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         return toSwap.size();
     }
 
-    private void writeString(final String toWrite, final OutputStream out) throws IOException {
-        final byte[] bytes = toWrite.getBytes("UTF-8");
+    private static void writeString(final String toWrite, final OutputStream out) throws IOException {
+        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
         final int utflen = bytes.length;
 
         if (utflen < 65535) {
@@ -415,26 +406,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
                 + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
         }
 
-        final String connectionId = in.readUTF();
+        final String connectionId = in.readUTF(); // Connection ID
         if (!connectionId.equals(queue.getIdentifier())) {
-            throw new IllegalArgumentException("Cannot restore contents from FlowFile Swap File " + swapLocation +
-                " because the file indicates that records belong to Connection with ID " + connectionId + " but attempted to swap those records into " + queue);
+            throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation +
+                " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
         }
 
         final int numRecords = in.readInt();
         in.readLong(); // Content Size
+        if (swapEncodingVersion > 7) {
+            in.readLong(); // Max Record ID
+        }
 
         return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
+    private static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
         final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
@@ -543,7 +537,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
         final byte[] bytes = new byte[numBytes];
         fillBuffer(in, bytes, numBytes);
-        return new String(bytes, "UTF-8");
+        return new String(bytes, StandardCharsets.UTF_8);
     }
 
     private static Integer readFieldLength(final InputStream in) throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 23746ce..20f2642 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -286,7 +286,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
-    private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
+    private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
 
     // guarded by rwLock
     /**
@@ -393,7 +393,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
         eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
 
-        final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager);
+        final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager);
         flowFileRepository = flowFileRepo;
         flowFileEventRepository = flowFileEventRepo;
         counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
@@ -668,7 +668,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         try {
             final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class);
             synchronized (contentRepo) {
-                contentRepo.initialize(contentClaimManager);
+                contentRepo.initialize(resourceClaimManager);
             }
             return contentRepo;
         } catch (final Exception e) {
@@ -728,11 +728,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         // Create and initialize a FlowFileSwapManager for this connection
         final FlowFileSwapManager swapManager = createSwapManager(properties);
         final EventReporter eventReporter = createEventReporter(getBulletinRepository());
+
         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
             final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
                 @Override
                 public ResourceClaimManager getResourceClaimManager() {
-                    return getResourceClaimManager();
+                    return resourceClaimManager;
                 }
 
                 @Override
@@ -756,6 +757,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             .destination(destination)
             .swapManager(swapManager)
             .eventReporter(eventReporter)
+            .resourceClaimManager(resourceClaimManager)
+            .flowFileRepository(flowFileRepository)
+            .provenanceRepository(provenanceEventRepository)
             .build();
     }
 
@@ -3188,7 +3192,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalArgumentException("Input Content Claim not specified");
             }
 
-            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
                 provEvent.getPreviousContentClaimIdentifier(), false);
             claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
             offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
@@ -3198,7 +3202,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalArgumentException("Output Content Claim not specified");
             }
 
-            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
                 provEvent.getContentClaimIdentifier(), false);
 
             claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
@@ -3247,7 +3251,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         try {
-            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
             final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
 
             if (!contentRepository.isAccessible(contentClaim)) {
@@ -3327,17 +3331,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         // Create the ContentClaim
-        final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
             event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
-        contentClaimManager.incrementClaimantCount(resourceClaim);
+        resourceClaimManager.incrementClaimantCount(resourceClaim);
         final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
         final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
         contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
 
         if (!contentRepository.isAccessible(contentClaim)) {
-            contentClaimManager.decrementClaimantCount(resourceClaim);
+            resourceClaimManager.decrementClaimantCount(resourceClaim);
             throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a32a485..cfbb770 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -81,9 +81,11 @@ import org.slf4j.LoggerFactory;
  * <p>
  * Provides a ProcessSession that ensures all accesses, changes and transfers
  * occur in an atomic manner for all FlowFiles including their contents and
- * attributes</p>
+ * attributes
+ * </p>
  * <p>
- * NOT THREAD SAFE</p>
+ * NOT THREAD SAFE
+ * </p>
  * <p/>
  */
 public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
@@ -104,7 +106,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private final Map<String, Long> globalCounters = new HashMap<>();
     private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
     private final ProcessContext context;
-    private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
+    private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
     private final Set<Path> deleteOnCommit = new HashSet<>();
     private final long sessionId;
     private final String connectableDescription;
@@ -114,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     private final StandardProvenanceReporter provenanceReporter;
 
-    private int removedCount = 0;    // number of flowfiles removed in this session
+    private int removedCount = 0; // number of flowfiles removed in this session
     private long removedBytes = 0L; // size of all flowfiles removed in this session
     private final LongHolder bytesRead = new LongHolder(0L);
     private final LongHolder bytesWritten = new LongHolder(0L);
@@ -169,7 +171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType,
-                context.getProvenanceRepository(), this);
+            context.getProvenanceRepository(), this);
         this.sessionId = idGenerator.getAndIncrement();
         this.connectableDescription = description;
 
@@ -196,7 +198,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         // Processor-reported events.
         List<ProvenanceEventRecord> autoTerminatedEvents = null;
 
-        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
+        // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
         for (final StandardRepositoryRecord record : records.values()) {
             if (record.isMarkedForDelete()) {
@@ -235,11 +237,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     }
                 }
             } else {
-                final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element
+                final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
                 record.setDestination(finalDestination.getFlowFileQueue());
                 incrementConnectionInputCounts(finalDestination, record);
 
-                for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed
+                for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
                     incrementConnectionInputCounts(destination, record);
                     final FlowFileRecord currRec = record.getCurrent();
                     final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
@@ -256,7 +258,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     if (claim != null) {
                         context.getContentRepository().incrementClaimaintCount(claim);
                     }
-                    newRecord.setWorking(clone, Collections.<String, String>emptyMap());
+                    newRecord.setWorking(clone, Collections.<String, String> emptyMap());
 
                     newRecord.setDestination(destination.getFlowFileQueue());
                     newRecord.setTransferRelationship(record.getTransferRelationship());
@@ -322,9 +324,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
                     final Connectable connectable = context.getConnectable();
                     final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
-                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
                 } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
-                    //records which have been updated - remove original if exists
+                    // records which have been updated - remove original if exists
                     removeContent(record.getOriginalClaim());
                 }
             }
@@ -356,7 +358,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
             for (final StandardRepositoryRecord record : checkpoint.records.values()) {
                 if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
-                    continue; //these don't need to be transferred
+                    continue; // these don't need to be transferred
                 }
                 // record.getCurrent() will return null if this record was created in this session --
                 // in this case, we just ignore it, and it will be cleaned up by clearing the records map.
@@ -390,7 +392,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             if (LOG.isInfoEnabled()) {
                 final String sessionSummary = summarizeEvents(checkpoint);
                 if (!sessionSummary.isEmpty()) {
-                    LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
+                    LOG.info("{} for {}, committed the following events: {}", new Object[] {this, connectableDescription, sessionSummary});
                 }
             }
 
@@ -611,9 +613,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 boolean creationEventRegistered = false;
                 if (registeredTypes != null) {
                     if (registeredTypes.contains(ProvenanceEventType.CREATE)
-                            || registeredTypes.contains(ProvenanceEventType.FORK)
-                            || registeredTypes.contains(ProvenanceEventType.JOIN)
-                            || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
+                        || registeredTypes.contains(ProvenanceEventType.FORK)
+                        || registeredTypes.contains(ProvenanceEventType.JOIN)
+                        || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
                         creationEventRegistered = true;
                     }
                 }
@@ -747,7 +749,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private StandardProvenanceEventRecord enrich(
-            final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
+        final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
         final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
@@ -1039,7 +1041,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         final StringBuilder sb = new StringBuilder(512);
         if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD
-                || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
+            || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
             if (numCreated > 0) {
                 sb.append("created ").append(numCreated).append(" FlowFiles, ");
             }
@@ -1097,7 +1099,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     private void formatNanos(final long nanos, final StringBuilder sb) {
         final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
-        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;;
+        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
+        ;
         final long nanosLeft = nanos % 1000000L;
 
         if (seconds > 0) {
@@ -1272,7 +1275,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         int flowFileCount = 0;
         long byteCount = 0L;
         for (final Connection conn : context.getPollableConnections()) {
-            final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize();
+            final QueueSize queueSize = conn.getFlowFileQueue().size();
             flowFileCount += queueSize.getObjectCount();
             byteCount += queueSize.getByteCount();
         }
@@ -1287,8 +1290,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-                .addAttributes(attrs)
-                .build();
+            .addAttributes(attrs)
+            .build();
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
         record.setWorking(fFile, attrs);
         records.put(fFile, record);
@@ -1324,7 +1327,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             context.getContentRepository().incrementClaimaintCount(claim);
         }
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
-        record.setWorking(clone, Collections.<String, String>emptyMap());
+        record.setWorking(clone, Collections.<String, String> emptyMap());
         records.put(clone, record);
 
         if (offset == 0L && size == example.getSize()) {
@@ -1637,7 +1640,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             return;
         }
 
-        LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()});
+        LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()});
         final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
 
         final String processorType;
@@ -1650,7 +1653,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
-                processorType, context.getProvenanceRepository(), this);
+            processorType, context.getProvenanceRepository(), this);
 
         final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
         for (final FlowFileRecord flowFile : flowFiles) {
@@ -1664,7 +1667,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
             final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
-            LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+            LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
         }
 
         try {
@@ -1696,7 +1699,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                                     record.getContentClaimOffset() + claim.getOffset(), record.getSize());
                             }
 
-                            enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
+                            enriched.setAttributes(record.getAttributes(), Collections.<String, String> emptyMap());
                             return enriched.build();
                         }
 
@@ -1780,9 +1783,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
-                final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
-                final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
-                final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+            final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
+            final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+            final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
 
             // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
             // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@@ -1853,7 +1856,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         try {
             try (final OutputStream rawOut = contentRepo.write(newClaim);
-                    final OutputStream out = new BufferedOutputStream(rawOut)) {
+                final OutputStream out = new BufferedOutputStream(rawOut)) {
 
                 if (header != null && header.length > 0) {
                     out.write(header);
@@ -2070,10 +2073,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         // the original claim if the record is "working" but the content has not been modified
         // (e.g., in the case of attributes only were updated)
         // In other words:
-        //  If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
-        //  return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
-        //  that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
-        //  because we will do that later, in the session.commit() and that would result in removing the original claim twice.
+        // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
+        // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
+        // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
+        // because we will do that later, in the session.commit() and that would result in removing the original claim twice.
         if (contentModified) {
             // In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be
             // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
@@ -2196,7 +2199,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     @Override
     public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
         validateRecordState(destination);
-        //TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
+        // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
         if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) {
             // If we do NOT want to keep the file, ensure that we can delete it, or else error.
             throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
@@ -2228,9 +2231,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         removeTemporaryClaim(record);
 
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
-                .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
-                .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
-                .build();
+            .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
+            .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
+            .build();
         record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
         if (!keepSourceFile) {
             deleteOnCommit.add(source);
@@ -2370,7 +2373,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
      *
      * @param flowFile the FlowFile to check
      * @return <code>true</code> if the FlowFile is known in this session,
-     * <code>false</code> otherwise.
+     *         <code>false</code> otherwise.
      */
     boolean isFlowFileKnown(final FlowFile flowFile) {
         return records.containsKey(flowFile);
@@ -2392,8 +2395,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final String key = entry.getKey();
             final String value = entry.getValue();
             if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
-                    || CoreAttributes.DISCARD_REASON.key().equals(key)
-                    || CoreAttributes.UUID.key().equals(key)) {
+                || CoreAttributes.DISCARD_REASON.key().equals(key)
+                || CoreAttributes.UUID.key().equals(key)) {
                 continue;
             }
             newAttributes.put(key, value);
@@ -2441,10 +2444,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-                .addAttributes(newAttributes)
-                .lineageIdentifiers(lineageIdentifiers)
-                .lineageStartDate(lineageStartDate)
-                .build();
+            .addAttributes(newAttributes)
+            .lineageIdentifiers(lineageIdentifiers)
+            .lineageStartDate(lineageStartDate)
+            .build();
 
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
         record.setWorking(fFile, newAttributes);
@@ -2465,7 +2468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
      */
     private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
         final Map<String, String> result = new HashMap<>();
-        //trivial cases
+        // trivial cases
         if (flowFileList == null || flowFileList.isEmpty()) {
             return result;
         } else if (flowFileList.size() == 1) {
@@ -2478,8 +2481,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
          */
         final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
 
-        outer:
-        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+        outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
             final String key = mapEntry.getKey();
             final String value = mapEntry.getValue();
             for (final FlowFile flowFile : flowFileList) {
@@ -2539,7 +2541,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         private final Set<String> removedFlowFiles = new HashSet<>();
         private final Set<String> createdFlowFiles = new HashSet<>();
 
-        private int removedCount = 0;    // number of flowfiles removed in this session
+        private int removedCount = 0; // number of flowfiles removed in this session
         private long removedBytes = 0L; // size of all flowfiles removed in this session
         private long bytesRead = 0L;
         private long bytesWritten = 0L;

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
index c4d040b..3c4fcdb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
@@ -26,7 +26,7 @@ public class Connectables {
 
     public static boolean flowFilesQueued(final Connectable connectable) {
         for (final Connection conn : connectable.getIncomingConnections()) {
-            if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
+            if (!conn.getFlowFileQueue().isEmpty()) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 6eeddc5..f7191c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -22,16 +22,26 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -47,7 +57,7 @@ public class TestFileSystemSwapManager {
             final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
             Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, "/src/test/resources/old-swap-file.swap", new NopResourceClaimManager());
+            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {
@@ -57,6 +67,53 @@ public class TestFileSystemSwapManager {
         }
     }
 
+    @Test
+    public void testRoundTripSerializeDeserialize() throws IOException {
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = new TestFlowFile(attrs, i);
+            toSwap.add(ff);
+        }
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final String swapLocation = "target/testRoundTrip.swap";
+        final File swapFile = new File(swapLocation);
+        Files.deleteIfExists(swapFile.toPath());
+
+        try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+            FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+        }
+
+        final List<FlowFileRecord> swappedIn;
+        try (final FileInputStream fis = new FileInputStream(swapFile);
+            final DataInputStream dis = new DataInputStream(fis)) {
+            swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
+        }
+
+        assertEquals(toSwap.size(), swappedIn.size());
+        for (int i = 0; i < toSwap.size(); i++) {
+            final FlowFileRecord pre = toSwap.get(i);
+            final FlowFileRecord post = swappedIn.get(i);
+
+            assertEquals(pre.getSize(), post.getSize());
+            assertEquals(pre.getAttributes(), post.getAttributes());
+            assertEquals(pre.getSize(), post.getSize());
+            assertEquals(pre.getId(), post.getId());
+            assertEquals(pre.getContentClaim(), post.getContentClaim());
+            assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
+            assertEquals(pre.getEntryDate(), post.getEntryDate());
+            assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
+            assertEquals(pre.getLineageIdentifiers(), post.getLineageIdentifiers());
+            assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
+            assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
+        }
+    }
+
+
     public class NopResourceClaimManager implements ResourceClaimManager {
 
         @Override
@@ -100,4 +157,87 @@ public class TestFileSystemSwapManager {
         public void purge() {
         }
     }
+
+
+    private static class TestFlowFile implements FlowFileRecord {
+        private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+        private final long id = idGenerator.getAndIncrement();
+        private final long entryDate = System.currentTimeMillis();
+        private final long lastQueueDate = System.currentTimeMillis();
+        private final Map<String, String> attributes;
+        private final long size;
+
+
+        public TestFlowFile(final Map<String, String> attributes, final long size) {
+            this.attributes = attributes;
+            this.size = size;
+        }
+
+
+        @Override
+        public long getId() {
+            return id;
+        }
+
+        @Override
+        public long getEntryDate() {
+            return entryDate;
+        }
+
+        @Override
+        public long getLineageStartDate() {
+            return entryDate;
+        }
+
+        @Override
+        public Long getLastQueueDate() {
+            return lastQueueDate;
+        }
+
+        @Override
+        public Set<String> getLineageIdentifiers() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public boolean isPenalized() {
+            return false;
+        }
+
+        @Override
+        public String getAttribute(String key) {
+            return attributes.get(key);
+        }
+
+        @Override
+        public long getSize() {
+            return size;
+        }
+
+        @Override
+        public Map<String, String> getAttributes() {
+            return Collections.unmodifiableMap(attributes);
+        }
+
+        @Override
+        public int compareTo(final FlowFile o) {
+            return Long.compare(id, o.getId());
+        }
+
+        @Override
+        public long getPenaltyExpirationMillis() {
+            return -1L;
+        }
+
+        @Override
+        public ContentClaim getContentClaim() {
+            return null;
+        }
+
+        @Override
+        public long getContentClaimOffset() {
+            return 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 12f8e5e..1783708 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -134,7 +134,7 @@ public class TestStandardProcessSession {
         final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
 
         final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
-        flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, swapManager, null, 10000);
+        flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 8bf5553..0e3bcac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.admin.service.UserService;
+import org.apache.nifi.authorization.DownloadAuthorization;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.Connectable;
@@ -47,9 +52,10 @@ import org.apache.nifi.controller.ContentAvailability;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -61,8 +67,8 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -75,7 +81,9 @@ import org.apache.nifi.provenance.search.SearchTerm;
 import org.apache.nifi.provenance.search.SearchTerms;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.search.SearchContext;
@@ -85,6 +93,7 @@ import org.apache.nifi.services.FlowService;
 import org.apache.nifi.user.NiFiUser;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.DownloadableContent;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
@@ -104,15 +113,6 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.DownloadableContent;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.admin.service.UserService;
-import org.apache.nifi.authorization.DownloadAuthorization;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.reporting.BulletinQuery;
-import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -501,7 +501,7 @@ public class ControllerFacade {
      * Site-to-Site communications
      *
      * @return the socket port that the Cluster Manager is listening on for
-     * Site-to-Site communications
+     *         Site-to-Site communications
      */
     public Integer getClusterManagerRemoteSiteListeningPort() {
         return flowController.getClusterManagerRemoteSiteListeningPort();
@@ -512,7 +512,7 @@ public class ControllerFacade {
      * Manager are secure
      *
      * @return whether or not Site-to-Site communications with the Cluster
-     * Manager are secure
+     *         Manager are secure
      */
     public Boolean isClusterManagerRemoteSiteCommsSecure() {
         return flowController.isClusterManagerRemoteSiteCommsSecure();
@@ -523,7 +523,7 @@ public class ControllerFacade {
      * Site-to-Site communications
      *
      * @return the socket port that the local instance is listening on for
-     * Site-to-Site communications
+     *         Site-to-Site communications
      */
     public Integer getRemoteSiteListeningPort() {
         return flowController.getRemoteSiteListeningPort();
@@ -534,7 +534,7 @@ public class ControllerFacade {
      * instance are secure
      *
      * @return whether or not Site-to-Site communications with the local
-     * instance are secure
+     *         instance are secure
      */
     public Boolean isRemoteSiteCommsSecure() {
         return flowController.isRemoteSiteCommsSecure();