You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:30 UTC

[02/40] nifi git commit: NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue

NIFI-730: Added methods for dropping queued flowfiles; refactored swap manager but have not yet started swapping flowfiles in or out from within the flowfile queue


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b8c51dc3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b8c51dc3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b8c51dc3

Branch: refs/heads/NIFI-274
Commit: b8c51dc35d1a7fdbf3e6449bbe297db667a1176c
Parents: b4bfcc1
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 11 10:27:07 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Oct 11 10:27:07 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowFileQueue.java   | 203 ------
 .../controller/queue/DropFlowFileState.java     |  40 ++
 .../controller/queue/DropFlowFileStatus.java    |  62 ++
 .../nifi/controller/queue/FlowFileQueue.java    | 256 ++++++++
 .../apache/nifi/controller/queue/QueueSize.java |  48 ++
 .../repository/FlowFileRepository.java          |   2 +-
 .../repository/FlowFileSwapManager.java         |  94 ++-
 .../controller/repository/QueueProvider.java    |   2 +-
 .../controller/repository/RepositoryRecord.java |   2 +-
 .../SwapManagerInitializationContext.java       |  41 ++
 .../apache/nifi/processor/ProcessSession.java   |   1 +
 .../org/apache/nifi/processor/QueueSize.java    |  48 --
 .../org/apache/nifi/connectable/Connection.java |   2 +-
 .../nifi/controller/StandardFlowFileQueue.java  | 108 +++-
 .../nifi/connectable/StandardConnection.java    |  26 +-
 .../nifi/controller/FileSystemSwapManager.java  | 626 ++++++++-----------
 .../apache/nifi/controller/FlowController.java  | 193 +++---
 .../repository/BatchingSessionFactory.java      |   2 +-
 .../repository/ConnectionSwapInfo.java          |  58 --
 .../repository/StandardProcessSession.java      |   4 +-
 .../repository/StandardRepositoryRecord.java    |   2 +-
 .../repository/VolatileFlowFileRepository.java  |   2 +-
 .../WriteAheadFlowFileRepository.java           |   2 +-
 .../controller/TestFileSystemSwapManager.java   |   6 +-
 .../repository/TestStandardProcessSession.java  |   9 +-
 .../TestWriteAheadFlowFileRepository.java       |   2 +-
 26 files changed, 1003 insertions(+), 838 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
deleted file mode 100644
index e1baeb7..0000000
--- a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.processor.FlowFileFilter;
-import org.apache.nifi.processor.QueueSize;
-
-public interface FlowFileQueue {
-
-    /**
-     * @return the unique identifier for this FlowFileQueue
-     */
-    String getIdentifier();
-
-    /**
-     * @return list of processing priorities for this queue
-     */
-    List<FlowFilePrioritizer> getPriorities();
-
-    /**
-     * @return the minimum number of FlowFiles that must be present in order for
-     * FlowFiles to begin being swapped out of the queue
-     */
-    int getSwapThreshold();
-
-    /**
-     * Resets the comparator used by this queue to maintain order.
-     *
-     * @param newPriorities the ordered list of prioritizers to use to determine
-     * order within this queue.
-     * @throws NullPointerException if arg is null
-     */
-    void setPriorities(List<FlowFilePrioritizer> newPriorities);
-
-    /**
-     * Establishes this queue's preferred maximum work load.
-     *
-     * @param maxQueueSize the maximum number of flow files this processor
-     * recommends having in its work queue at any one time
-     */
-    void setBackPressureObjectThreshold(long maxQueueSize);
-
-    /**
-     * @return maximum number of flow files that should be queued up at any one
-     * time
-     */
-    long getBackPressureObjectThreshold();
-
-    /**
-     * @param maxDataSize Establishes this queue's preferred maximum data size.
-     */
-    void setBackPressureDataSizeThreshold(String maxDataSize);
-
-    /**
-     * @return maximum data size that should be queued up at any one time
-     */
-    String getBackPressureDataSizeThreshold();
-
-    QueueSize size();
-
-    /**
-     * @return total size in bytes of the queue flow file's content
-     */
-    long contentSize();
-
-    /**
-     * @return true if no items queue; false otherwise
-     */
-    boolean isEmpty();
-
-    /**
-     * @return true if the active queue is empty; false otherwise. The Active
-     * queue contains those FlowFiles that can be processed immediately and does
-     * not include those FlowFiles that have been swapped out or are currently
-     * being processed
-     */
-    boolean isActiveQueueEmpty();
-
-    QueueSize getActiveQueueSize();
-
-    /**
-     * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
-     * is considered to be unacknowledged if it has been pulled from the queue by some component
-     * but the session that pulled the FlowFile has not yet been committed or rolled back.
-     *
-     * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
-     */
-    QueueSize getUnacknowledgedQueueSize();
-
-    void acknowledge(FlowFileRecord flowFile);
-
-    void acknowledge(Collection<FlowFileRecord> flowFiles);
-
-    /**
-     * @return true if maximum queue size has been reached or exceeded; false
-     * otherwise
-     */
-    boolean isFull();
-
-    /**
-     * places the given file into the queue
-     *
-     * @param file to place into queue
-     */
-    void put(FlowFileRecord file);
-
-    /**
-     * places the given files into the queue
-     *
-     * @param files to place into queue
-     */
-    void putAll(Collection<FlowFileRecord> files);
-
-    /**
-     * Removes all records from the internal swap queue and returns them.
-     *
-     * @return all removed records from internal swap queue
-     */
-    List<FlowFileRecord> pollSwappableRecords();
-
-    /**
-     * Restores the records from swap space into this queue, adding the records
-     * that have expired to the given set instead of enqueuing them.
-     *
-     * @param records that were swapped in
-     */
-    void putSwappedRecords(Collection<FlowFileRecord> records);
-
-    /**
-     * Updates the internal counters of how much data is queued, based on
-     * swapped data that is being restored.
-     *
-     * @param numRecords count of records swapped in
-     * @param contentSize total size of records being swapped in
-     */
-    void incrementSwapCount(int numRecords, long contentSize);
-
-    /**
-     * @return the number of FlowFiles that are enqueued and not swapped
-     */
-    int unswappedSize();
-
-    int getSwapRecordCount();
-
-    int getSwapQueueSize();
-
-    /**
-     * @param expiredRecords expired records
-     * @return the next flow file on the queue; null if empty
-     */
-    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
-
-    /**
-     * @param maxResults limits how many results can be polled
-     * @param expiredRecords for expired records
-     * @return the next flow files on the queue up to the max results; null if
-     * empty
-     */
-    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
-
-    /**
-     * Drains flow files from the given source queue into the given destination
-     * list.
-     *
-     * @param sourceQueue queue to drain from
-     * @param destination Collection to drain to
-     * @param maxResults max number to drain
-     * @param expiredRecords for expired records
-     * @return size (bytes) of flow files drained from queue
-     */
-    long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
-
-    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
-
-    String getFlowFileExpiration();
-
-    int getFlowFileExpiration(TimeUnit timeUnit);
-
-    void setFlowFileExpiration(String flowExpirationPeriod);
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
new file mode 100644
index 0000000..3f16d00
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the state that a Drop FlowFile request is in
+ */
+public enum DropFlowFileState {
+
+    WAITING_FOR_LOCK("Waiting for Destination Component to complete its action"),
+    DROPPING_ACTIVE_FLOWFILES("Dropping FlowFiles from queue"),
+    COMPLETE("Completed Successfully"),
+    FAILURE("Failed");
+    
+    private final String description;
+    
+    private DropFlowFileState(final String description) {
+        this.description = description;
+    }
+    
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
new file mode 100644
index 0000000..b216608
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents the status of a Drop FlowFile Request that has been issued to
+ * a {@link FlowFileQueue}. When a queue is requested to drop its FlowFiles,
+ * that process may be rather lengthy in the case of a poorly behaving
+ * FlowFileRepository or if the destination Processor is polling from the
+ * queue using a filter that is misbehaving. As a result, the dropping of
+ * FlowFiles is performed asynchronously.
+ *
+ * This status object provides information about how far along in the process
+ * we currently are and information about the success or failure of the
+ * operation.
+ */
+public interface DropFlowFileStatus {
+
+    /**
+     * @return the identifier of the request to drop FlowFiles from the queue
+     */
+    String getRequestIdentifier();
+
+    /**
+     * @return the date/time (in milliseconds since epoch) at which the request to
+     *         drop the FlowFiles from a queue was submitted
+     */
+    long getRequestSubmissionTime();
+
+    /**
+     * @return the size of the queue when the drop request was issued or <code>null</code> if
+     *         it is not yet known, which can happen if the {@link DropFlowFileState} is
+     *         {@link DropFlowFileState#WAITING_FOR_LOCK}.
+     */
+    QueueSize getOriginalSize();
+
+    /**
+     * @return the current size of the queue or <code>null</code> if it is not yet known
+     */
+    QueueSize getCurrentSize();
+
+    /**
+     * @return the current state of the operation
+     */
+    DropFlowFileState getState();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
new file mode 100644
index 0000000..31f17e0
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public interface FlowFileQueue {
+
+    /**
+     * @return the unique identifier for this FlowFileQueue
+     */
+    String getIdentifier();
+
+    /**
+     * @return list of processing priorities for this queue
+     */
+    List<FlowFilePrioritizer> getPriorities();
+
+    /**
+     * Reads any Swap Files that belong to this queue and increments counts so that the size
+     * of the queue will reflect the size of all FlowFiles regardless of whether or not they are
+     * swapped out. This will be called only during NiFi startup as an initialization step. This
+     * method is then responsible for returning the largest ID of any FlowFile that is swapped
+     * out, or <code>null</code> if no FlowFiles are swapped out for this queue.
+     *
+     * @return the largest ID of any FlowFile that is swapped out for this queue, or <code>null</code> if
+     *         no FlowFiles are swapped out for this queue.
+     */
+    Long recoverSwappedFlowFiles();
+
+    /**
+     * Destroys any Swap Files that exist for this queue without updating the FlowFile Repository
+     * or Provenance Repository. This is done only on startup in the case of non-persistent
+     * repositories. In the case of non-persistent repositories, we may still have Swap Files because
+     * we may still need to overflow the FlowFiles from heap onto disk, even though we don't want to keep
+     * the FlowFiles on restart.
+     */
+    void purgeSwapFiles();
+
+    /**
+     * @return the minimum number of FlowFiles that must be present in order for
+     *         FlowFiles to begin being swapped out of the queue
+     */
+    // TODO: REMOVE THIS.
+    int getSwapThreshold();
+
+    /**
+     * Resets the comparator used by this queue to maintain order.
+     *
+     * @param newPriorities the ordered list of prioritizers to use to determine
+     *            order within this queue.
+     * @throws NullPointerException if arg is null
+     */
+    void setPriorities(List<FlowFilePrioritizer> newPriorities);
+
+    /**
+     * Establishes this queue's preferred maximum work load.
+     *
+     * @param maxQueueSize the maximum number of flow files this processor
+     *            recommends having in its work queue at any one time
+     */
+    void setBackPressureObjectThreshold(long maxQueueSize);
+
+    /**
+     * @return maximum number of flow files that should be queued up at any one
+     *         time
+     */
+    long getBackPressureObjectThreshold();
+
+    /**
+     * @param maxDataSize Establishes this queue's preferred maximum data size.
+     */
+    void setBackPressureDataSizeThreshold(String maxDataSize);
+
+    /**
+     * @return maximum data size that should be queued up at any one time
+     */
+    String getBackPressureDataSizeThreshold();
+
+    QueueSize size();
+
+    /**
+     * @return true if no items queue; false otherwise
+     */
+    boolean isEmpty();
+
+    /**
+     * @return true if the active queue is empty; false otherwise. The Active
+     *         queue contains those FlowFiles that can be processed immediately and does
+     *         not include those FlowFiles that have been swapped out or are currently
+     *         being processed
+     */
+    // TODO: REMOVE?
+    boolean isActiveQueueEmpty();
+
+    // TODO: REMOVE?
+    QueueSize getActiveQueueSize();
+
+    /**
+     * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
+     * is considered to be unacknowledged if it has been pulled from the queue by some component
+     * but the session that pulled the FlowFile has not yet been committed or rolled back.
+     *
+     * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
+     */
+    QueueSize getUnacknowledgedQueueSize();
+
+    void acknowledge(FlowFileRecord flowFile);
+
+    void acknowledge(Collection<FlowFileRecord> flowFiles);
+
+    /**
+     * @return true if maximum queue size has been reached or exceeded; false
+     *         otherwise
+     */
+    boolean isFull();
+
+    /**
+     * places the given file into the queue
+     *
+     * @param file to place into queue
+     */
+    void put(FlowFileRecord file);
+
+    /**
+     * places the given files into the queue
+     *
+     * @param files to place into queue
+     */
+    void putAll(Collection<FlowFileRecord> files);
+
+    /**
+     * Removes all records from the internal swap queue and returns them.
+     *
+     * @return all removed records from internal swap queue
+     */
+    // TODO: REMOVE THIS?
+    List<FlowFileRecord> pollSwappableRecords();
+
+    /**
+     * Restores the records from swap space into this queue, adding the records
+     * that have expired to the given set instead of enqueuing them.
+     *
+     * @param records that were swapped in
+     */
+    // TODO: REMOVE THIS?
+    void putSwappedRecords(Collection<FlowFileRecord> records);
+
+    /**
+     * Updates the internal counters of how much data is queued, based on
+     * swapped data that is being restored.
+     *
+     * @param numRecords count of records swapped in
+     * @param contentSize total size of records being swapped in
+     */
+    // TODO: REMOVE THIS?
+    void incrementSwapCount(int numRecords, long contentSize);
+
+    /**
+     * @return the number of FlowFiles that are enqueued and not swapped
+     */
+    // TODO: REMOVE THIS?
+    int unswappedSize();
+
+    // TODO: REMOVE THIS?
+    int getSwapRecordCount();
+
+    // TODO: REMOVE THIS?
+    int getSwapQueueSize();
+
+    /**
+     * @param expiredRecords expired records
+     * @return the next flow file on the queue; null if empty
+     */
+    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
+
+    /**
+     * @param maxResults limits how many results can be polled
+     * @param expiredRecords for expired records
+     * @return the next flow files on the queue up to the max results; null if
+     *         empty
+     */
+    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
+
+    /**
+     * Drains flow files from the given source queue into the given destination
+     * list.
+     *
+     * @param sourceQueue queue to drain from
+     * @param destination Collection to drain to
+     * @param maxResults max number to drain
+     * @param expiredRecords for expired records
+     * @return size (bytes) of flow files drained from queue
+     */
+    long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords);
+
+    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
+
+    String getFlowFileExpiration();
+
+    int getFlowFileExpiration(TimeUnit timeUnit);
+
+    void setFlowFileExpiration(String flowExpirationPeriod);
+
+    /**
+     * Initiates a request to drop all FlowFiles in this queue. This method returns
+     * a DropFlowFileStatus that can be used to determine the current state of the request.
+     * Additionally, the DropFlowFileStatus provides a request identifier that can then be
+     * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
+     * methods in order to obtain the status later or cancel a request
+     *
+     * @return the status of the drop request.
+     */
+    DropFlowFileStatus dropFlowFiles();
+
+    /**
+     * Returns the current status of a Drop FlowFile Request that was initiated via the
+     * {@link #dropFlowFiles()} method that has the given identifier
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @return the status for the request with the given identifier, or <code>null</code> if no
+     *         request status exists with that identifier
+     */
+    DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier);
+
+    /**
+     * Cancels the request to drop FlowFiles that has the given identifier
+     *
+     * @param requestIdentifier the identifier of the Drop FlowFile Request
+     * @return <code>true</code> if the request was canceled, <code>false</code> if the request has
+     *         already completed or is not known
+     */
+    boolean cancelDropFlowFileRequest(String requestIdentifier);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
new file mode 100644
index 0000000..42d8416
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue;
+
+/**
+ *
+ */
+public class QueueSize {
+
+    private final int objectCount;
+    private final long totalSizeBytes;
+
+    public QueueSize(final int numberObjects, final long totalSizeBytes) {
+        if (numberObjects < 0 || totalSizeBytes < 0) {
+            throw new IllegalArgumentException();
+        }
+        objectCount = numberObjects;
+        this.totalSizeBytes = totalSizeBytes;
+    }
+
+    /**
+     * @return number of objects present on the queue
+     */
+    public int getObjectCount() {
+        return objectCount;
+    }
+
+    /**
+     * @return total size in bytes of the content for the data on the queue
+     */
+    public long getByteCount() {
+        return totalSizeBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
index 58fc6b3..906cbe2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 2e5be11..57e9186 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 
 /**
  * Defines a mechanism by which FlowFiles can be move into external storage or
@@ -26,38 +29,81 @@ import org.apache.nifi.events.EventReporter;
 public interface FlowFileSwapManager {
 
     /**
-     * Starts the Manager's background threads to start swapping FlowFiles in
-     * and out of memory
+     * Initializes the Swap Manager, providing a {@link SwapManagerInitializationContext} so that the
+     * Swap Manager has access to all of the components necessary to perform its functions
      *
-     * @param flowFileRepository the FlowFileRepository that must be notified of
-     * any swapping in or out of FlowFiles
-     * @param queueProvider the provider of FlowFileQueue's so that FlowFiles
-     * can be obtained and restored
-     * @param claimManager the ContentClaimManager to use for interacting with
-     * Content Claims
-     * @param reporter the EventReporter that can be used for notifying users of
-     * important events
+     * @param initializationContext the context the provides the swap manager with access to the
+     *            resources that it needs to perform its functions
      */
-    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ResourceClaimManager claimManager, EventReporter reporter);
+    void initialize(SwapManagerInitializationContext initializationContext);
 
     /**
-     * Shuts down the manager
+     * Swaps out the given FlowFiles that belong to the queue with the given identifier.
+     *
+     * @param flowFiles the FlowFiles to swap out to external storage
+     * @param flowFileQueue the queue that the FlowFiles belong to
+     * @return the location of the externally stored swap file
+     *
+     * @throws IOException if unable to swap the FlowFiles out
      */
-    void shutdown();
+    String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException;
 
     /**
-     * Removes all Swap information, permanently destroying any FlowFiles that
-     * have been swapped out
+     * Recovers the SwapFiles from the swap file that lives at the given location. This action
+     * provides a view of the FlowFiles but does not actively swap them in, meaning that the swap file
+     * at the given location remains in that location and the FlowFile Repository is not updated.
+     *
+     * @param swapLocation the location of hte swap file
+     * @param flowFileQueue the queue that the FlowFiles belong to
+     * @return the FlowFiles that live at the given swap location
+     *
+     * @throws IOException if unable to recover the FlowFiles from the given location
      */
-    void purge();
+    List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException;
 
     /**
-     * Notifies FlowFile queues of the number of FlowFiles and content size of
-     * all FlowFiles that are currently swapped out
+     * Recovers the FlowFiles from the swap file that lives at the given location and belongs
+     * to the FlowFile Queue with the given identifier. The FlowFile Repository is then updated
+     * and the swap file is permanently removed from the external storage
+     *
+     * @param swapLocation the location of the swap file
+     * @param flowFileQueue the queue to which the FlowFiles belong
+     *
+     * @return the FlowFiles that are stored in the given location
+     *
+     * @throws IOException if unable to recover the FlowFiles from the given location or update the
+     *             FlowFileRepository
+     */
+    List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException;
+
+    /**
+     * Determines swap files that exist for the given FlowFileQueue
+     *
+     * @param flowFileQueue the queue for which the FlowFiles should be recovered
      *
-     * @param connectionProvider provider
-     * @param claimManager manager
-     * @return how many flowfiles have been recovered
+     * @return all swap locations that have been identified for the given queue, in the order that they should
+     *         be swapped back in
      */
-    long recoverSwappedFlowFiles(QueueProvider connectionProvider, ResourceClaimManager claimManager);
+    List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException;
+
+    /**
+     * Determines how many FlowFiles and the size of the FlowFiles that are swapped out at the given location
+     *
+     * @param swapLocation the location of the swap file
+     * @return the QueueSize representing the number of FlowFiles and total size of the FlowFiles that are swapped out
+     */
+    QueueSize getSwapSize(String swapLocation) throws IOException;
+
+    /**
+     * Returns the maximum record id of the FlowFiles stored at the given swap location
+     *
+     * @param swapLocation the swap location to read id's from
+     * @return the max record id of any FlowFile in the swap location, or null if no record ID's can be found
+     */
+    Long getMaxRecordId(String swapLocation) throws IOException;
+
+    /**
+     * Purge all known Swap Files without updating FlowFileRepository or Provenance Repository
+     */
+    void purge();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
index fcb516d..95d9f2e 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java
@@ -18,7 +18,7 @@ package org.apache.nifi.controller.repository;
 
 import java.util.Collection;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 
 /**
  * Provides a collection of <code>FlowFileQueue</code>s that represents all

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
index 40d44a8..09202c0 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.controller.repository;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
new file mode 100644
index 0000000..564d5ec
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/SwapManagerInitializationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+
+public interface SwapManagerInitializationContext {
+
+    /**
+     * @return the {@link FlowFileRepository} that should be updated when FlowFiles are swapped in and out
+     */
+    FlowFileRepository getFlowFileRepository();
+
+
+    /**
+     * @return the {@link ResourceClaimManager} that is necessary to provide to the FlowFileRepository when
+     *         performing swapping actions
+     */
+    ResourceClaimManager getResourceClaimManager();
+
+    /**
+     * @return an {@link EventReporter} that can be used to report events to users
+     */
+    EventReporter getEventReporter();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index ed46d68..ebd56a9 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.exception.FlowFileHandlingException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
deleted file mode 100644
index c3c2ccc..0000000
--- a/nifi-api/src/main/java/org/apache/nifi/processor/QueueSize.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor;
-
-/**
- *
- */
-public class QueueSize {
-
-    private final int objectCount;
-    private final long totalSizeBytes;
-
-    public QueueSize(final int numberObjects, final long totalSizeBytes) {
-        if (numberObjects < 0 || totalSizeBytes < 0) {
-            throw new IllegalArgumentException();
-        }
-        objectCount = numberObjects;
-        this.totalSizeBytes = totalSizeBytes;
-    }
-
-    /**
-     * @return number of objects present on the queue
-     */
-    public int getObjectCount() {
-        return objectCount;
-    }
-
-    /**
-     * @return total size in bytes of the content for the data on the queue
-     */
-    public long getByteCount() {
-        return totalSizeBytes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
index 0a0089d..2e66905 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
@@ -20,7 +20,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index f47ea2f..df356fd 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,21 +35,25 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult;
-import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.concurrency.TimedLock;
 import org.apache.nifi.util.timebuffer.LongEntityAccess;
 import org.apache.nifi.util.timebuffer.TimedBuffer;
 import org.apache.nifi.util.timebuffer.TimestampedLong;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,12 +89,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private boolean swapMode = false;
     private long maximumQueueObjectCount;
 
+    private final EventReporter eventReporter;
     private final AtomicLong flowFileExpirationMillis;
     private final Connection connection;
     private final AtomicReference<String> flowFileExpirationPeriod;
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
     private final List<FlowFilePrioritizer> priorities;
     private final int swapThreshold;
+    private final FlowFileSwapManager swapManager;
     private final TimedLock readLock;
     private final TimedLock writeLock;
     private final String identifier;
@@ -101,7 +108,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
     private final ProcessScheduler scheduler;
 
-    public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final int swapThreshold) {
+    public StandardFlowFileQueue(final String identifier, final Connection connection, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
+        final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         maximumQueueObjectCount = 0L;
@@ -110,6 +118,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         flowFileExpirationMillis = new AtomicLong(0);
         flowFileExpirationPeriod = new AtomicReference<>("0 mins");
         swapQueue = new ArrayList<>();
+        this.eventReporter = eventReporter;
+        this.swapManager = swapManager;
 
         this.identifier = identifier;
         this.swapThreshold = swapThreshold;
@@ -233,21 +243,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     }
 
     @Override
-    public long contentSize() {
-        readLock.lock();
-        try {
-            final PreFetch prefetch = preFetchRef.get();
-            if (prefetch == null) {
-                return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount();
-            } else {
-                return activeQueueContentSize + swappedContentSize + unacknowledgedSizeRef.get().getObjectCount() + prefetch.size().getByteCount();
-            }
-        } finally {
-            readLock.unlock("getContentSize");
-        }
-    }
-
-    @Override
     public boolean isEmpty() {
         readLock.lock();
         try {
@@ -945,11 +940,88 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         this.flowFileExpirationMillis.set(millis);
     }
 
+
+    @Override
+    public void purgeSwapFiles() {
+        swapManager.purge();
+    }
+
+    @Override
+    public Long recoverSwappedFlowFiles() {
+        int swapFlowFileCount = 0;
+        long swapByteCount = 0L;
+        Long maxId = null;
+
+        writeLock.lock();
+        try {
+            final List<String> swapLocations;
+            try {
+                swapLocations = swapManager.recoverSwapLocations(this);
+            } catch (final IOException ioe) {
+                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getIdentifier());
+                logger.error("", ioe);
+                if (eventReporter != null) {
+                    eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " +
+                        getIdentifier() + "; see logs for more detials");
+                }
+                return null;
+            }
+
+            for (final String swapLocation : swapLocations) {
+                try {
+                    final QueueSize queueSize = swapManager.getSwapSize(swapLocation);
+                    final Long maxSwapRecordId = swapManager.getMaxRecordId(swapLocation);
+                    if (maxSwapRecordId != null) {
+                        if (maxId == null || maxSwapRecordId > maxId) {
+                            maxId = maxSwapRecordId;
+                        }
+                    }
+
+                    swapFlowFileCount += queueSize.getObjectCount();
+                    swapByteCount += queueSize.getByteCount();
+                } catch (final IOException ioe) {
+                    logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation, ioe.toString());
+                    logger.error("", ioe);
+                    if (eventReporter != null) {
+                        eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation +
+                            "; the file appears to be corrupt. See logs for more details");
+                    }
+                }
+            }
+
+            this.swappedRecordCount = swapFlowFileCount;
+            this.swappedContentSize = swapByteCount;
+        } finally {
+            writeLock.unlock("Recover Swap Files");
+        }
+
+        return maxId;
+    }
+
+
     @Override
     public String toString() {
         return "FlowFileQueue[id=" + identifier + "]";
     }
 
+    @Override
+    public DropFlowFileStatus dropFlowFiles() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean cancelDropFlowFileRequest(String requestIdentifier) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
     /**
      * Lock the queue so that other threads are unable to interact with the
      * queue

http://git-wip-us.apache.org/repos/asf/nifi/blob/b8c51dc3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index ad556e2..f0a6d8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -26,18 +26,19 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.nifi.controller.FlowFileQueue;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.util.NiFiProperties;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
 /**
  * Models a connection between connectable components. A connection may contain one or more relationships that map the source component to the destination component.
  */
@@ -65,7 +66,7 @@ public final class StandardConnection implements Connection {
         destination = new AtomicReference<>(builder.destination);
         relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
-        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, NiFiProperties.getInstance().getQueueSwapThreshold());
+        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -259,6 +260,8 @@ public final class StandardConnection implements Connection {
         private Connectable source;
         private Connectable destination;
         private Collection<Relationship> relationships;
+        private FlowFileSwapManager swapManager;
+        private EventReporter eventReporter;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -305,6 +308,16 @@ public final class StandardConnection implements Connection {
             return this;
         }
 
+        public Builder swapManager(final FlowFileSwapManager swapManager) {
+            this.swapManager = swapManager;
+            return this;
+        }
+
+        public Builder eventReporter(final EventReporter eventReporter) {
+            this.eventReporter = eventReporter;
+            return this;
+        }
+
         public StandardConnection build() {
             if (source == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Source");
@@ -312,6 +325,9 @@ public final class StandardConnection implements Connection {
             if (destination == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Destination");
             }
+            if (swapManager == null) {
+                throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
+            }
 
             if (relationships == null) {
                 relationships = new ArrayList<>();