You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/03 16:36:32 UTC

[04/40] nifi git commit: NIFI-730: Implemented swapping in and out on-demand by the FlowFileQueue rather than in a background thread

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
new file mode 100644
index 0000000..66f32d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardFlowFileQueue {
+    private TestSwapManager swapManager = null;
+    private StandardFlowFileQueue queue = null;
+
+    @Before
+    public void setup() {
+        final Connection connection = Mockito.mock(Connection.class);
+        Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
+        Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+        final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+        swapManager = new TestSwapManager();
+
+        final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
+        final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
+
+        queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
+        TestFlowFile.idGenerator.set(0L);
+    }
+
+
+    @Test
+    public void testSwapOutOccurs() {
+        for (int i = 0; i < 10000; i++) {
+            queue.put(new TestFlowFile());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 1, queue.size().getObjectCount());
+            assertEquals(i + 1, queue.size().getByteCount());
+        }
+
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new TestFlowFile());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 10001, queue.size().getObjectCount());
+            assertEquals(i + 10001, queue.size().getByteCount());
+        }
+
+        queue.put(new TestFlowFile(1000));
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(20999, queue.size().getByteCount());
+
+        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testLowestPrioritySwappedOutFirst() {
+        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+        prioritizers.add(new FlowFileSizePrioritizer());
+        queue.setPriorities(prioritizers);
+
+        long maxSize = 20000;
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile(maxSize - i));
+        }
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+
+        assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
+        assertEquals(10000, flowFiles.size());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, flowFiles.get(i).getSize());
+        }
+    }
+
+    @Test
+    public void testSwapIn() {
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new TestFlowFile());
+        }
+
+        assertEquals(1, swapManager.swappedOut.size());
+        queue.put(new TestFlowFile());
+        assertEquals(1, swapManager.swappedOut.size());
+
+        final Set<FlowFileRecord> exp = new HashSet<>();
+        for (int i = 0; i < 9999; i++) {
+            assertNotNull(queue.poll(exp));
+        }
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+        assertNotNull(queue.poll(exp));
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+
+        assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
+        assertEquals(1, swapManager.swapInCalledCount);
+        assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+
+        queue.poll(exp);
+
+    }
+
+
+    private class TestSwapManager implements FlowFileSwapManager {
+        private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
+        int swapOutCalledCount = 0;
+        int swapInCalledCount = 0;
+
+
+        @Override
+        public void initialize(final SwapManagerInitializationContext initializationContext) {
+
+        }
+
+        @Override
+        public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
+            swapOutCalledCount++;
+            final String location = UUID.randomUUID().toString();
+            swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
+            return location;
+        }
+
+        @Override
+        public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
+            return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
+        }
+
+        @Override
+        public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+            swapInCalledCount++;
+            return swappedOut.remove(swapLocation);
+        }
+
+        @Override
+        public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
+            return new ArrayList<String>(swappedOut.keySet());
+        }
+
+        @Override
+        public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) {
+
+        }
+
+        @Override
+        public QueueSize getSwapSize(String swapLocation) throws IOException {
+            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+            if (flowFiles == null) {
+                return new QueueSize(0, 0L);
+            }
+
+            int count = 0;
+            long size = 0L;
+            for (final FlowFileRecord flowFile : flowFiles) {
+                count++;
+                size += flowFile.getSize();
+            }
+
+            return new QueueSize(count, size);
+        }
+
+        @Override
+        public Long getMaxRecordId(String swapLocation) throws IOException {
+            final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+            if (flowFiles == null) {
+                return null;
+            }
+
+            Long max = null;
+            for (final FlowFileRecord flowFile : flowFiles) {
+                if (max == null || flowFile.getId() > max) {
+                    max = flowFile.getId();
+                }
+            }
+
+            return max;
+        }
+
+        @Override
+        public void purge() {
+            swappedOut.clear();
+        }
+    }
+
+
+    private static class TestFlowFile implements FlowFileRecord {
+        private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+        private final long id = idGenerator.getAndIncrement();
+        private final long entryDate = System.currentTimeMillis();
+        private final Map<String, String> attributes;
+        private final long size;
+
+        public TestFlowFile() {
+            this(1L);
+        }
+
+        public TestFlowFile(final long size) {
+            this(new HashMap<String, String>(), size);
+        }
+
+        public TestFlowFile(final Map<String, String> attributes, final long size) {
+            this.attributes = attributes;
+            this.size = size;
+        }
+
+
+        @Override
+        public long getId() {
+            return id;
+        }
+
+        @Override
+        public long getEntryDate() {
+            return entryDate;
+        }
+
+        @Override
+        public long getLineageStartDate() {
+            return entryDate;
+        }
+
+        @Override
+        public Long getLastQueueDate() {
+            return null;
+        }
+
+        @Override
+        public Set<String> getLineageIdentifiers() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public boolean isPenalized() {
+            return false;
+        }
+
+        @Override
+        public String getAttribute(String key) {
+            return attributes.get(key);
+        }
+
+        @Override
+        public long getSize() {
+            return size;
+        }
+
+        @Override
+        public Map<String, String> getAttributes() {
+            return Collections.unmodifiableMap(attributes);
+        }
+
+        @Override
+        public int compareTo(final FlowFile o) {
+            return Long.compare(id, o.getId());
+        }
+
+        @Override
+        public long getPenaltyExpirationMillis() {
+            return 0;
+        }
+
+        @Override
+        public ContentClaim getContentClaim() {
+            return null;
+        }
+
+        @Override
+        public long getContentClaimOffset() {
+            return 0;
+        }
+    }
+
+    private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
+        @Override
+        public int compare(final FlowFile o1, final FlowFile o2) {
+            return Long.compare(o1.getSize(), o2.getSize());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index f0a6d8a..d43a3db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -32,11 +32,14 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.util.NiFiProperties;
 
 /**
@@ -66,7 +69,8 @@ public final class StandardConnection implements Connection {
         destination = new AtomicReference<>(builder.destination);
         relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
-        flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
+        flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
+            scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -262,6 +266,9 @@ public final class StandardConnection implements Connection {
         private Collection<Relationship> relationships;
         private FlowFileSwapManager swapManager;
         private EventReporter eventReporter;
+        private FlowFileRepository flowFileRepository;
+        private ProvenanceEventRepository provenanceRepository;
+        private ResourceClaimManager resourceClaimManager;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -318,6 +325,21 @@ public final class StandardConnection implements Connection {
             return this;
         }
 
+        public Builder flowFileRepository(final FlowFileRepository flowFileRepository) {
+            this.flowFileRepository = flowFileRepository;
+            return this;
+        }
+
+        public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) {
+            this.provenanceRepository = provenanceRepository;
+            return this;
+        }
+
+        public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) {
+            this.resourceClaimManager = resourceClaimManager;
+            return this;
+        }
+
         public StandardConnection build() {
             if (source == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Source");
@@ -328,6 +350,15 @@ public final class StandardConnection implements Connection {
             if (swapManager == null) {
                 throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
             }
+            if (flowFileRepository == null) {
+                throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository");
+            }
+            if (provenanceRepository == null) {
+                throw new IllegalStateException("Cannot build a Connection without a Provenance Repository");
+            }
+            if (resourceClaimManager == null) {
+                throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager");
+            }
 
             if (relationships == null) {
                 relationships = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 7ab56ed..c4a86f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -28,6 +28,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -79,7 +80,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private EventReporter eventReporter;
     private ResourceClaimManager claimManager;
 
-
     public FileSystemSwapManager() {
         final NiFiProperties properties = NiFiProperties.getInstance();
         final Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
@@ -111,6 +111,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
             serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
             fos.getFD().sync();
+        } catch (final IOException ioe) {
+            // we failed to write out the entire swap file. Delete the temporary file, if we can.
+            swapTempFile.delete();
+            throw ioe;
         }
 
         if (swapTempFile.renameTo(swapFile)) {
@@ -133,25 +137,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
         }
 
-        // TODO: When FlowFile Queue performs this operation, it needs to take the following error handling logic into account:
-
-        /*
-         * } catch (final EOFException eof) {
-         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
-         *
-         * if (!swapFile.delete()) {
-         * warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
-         * }
-         * } catch (final FileNotFoundException fnfe) {
-         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
-         * } catch (final Exception e) {
-         * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
-         *
-         * if (swapFile != null) {
-         * queue.add(swapFile);
-         * }
-         * }
-         */
         return swappedFlowFiles;
     }
 
@@ -165,7 +150,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         final List<FlowFileRecord> swappedFlowFiles;
         try (final InputStream fis = new FileInputStream(swapFile);
             final DataInputStream in = new DataInputStream(fis)) {
-            swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, swapLocation, claimManager);
+            swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
         }
 
         return swappedFlowFiles;
@@ -189,6 +174,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
     @Override
+    public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException {
+
+    }
+
+
+    @Override
     public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
@@ -322,7 +313,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     }
 
 
-    public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
+    public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
         if (toSwap == null || toSwap.isEmpty()) {
             return 0;
         }
@@ -396,8 +387,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         return toSwap.size();
     }
 
-    private void writeString(final String toWrite, final OutputStream out) throws IOException {
-        final byte[] bytes = toWrite.getBytes("UTF-8");
+    private static void writeString(final String toWrite, final OutputStream out) throws IOException {
+        final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
         final int utflen = bytes.length;
 
         if (utflen < 65535) {
@@ -415,26 +406,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
         final int swapEncodingVersion = in.readInt();
         if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
             throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
                 + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
         }
 
-        final String connectionId = in.readUTF();
+        final String connectionId = in.readUTF(); // Connection ID
         if (!connectionId.equals(queue.getIdentifier())) {
-            throw new IllegalArgumentException("Cannot restore contents from FlowFile Swap File " + swapLocation +
-                " because the file indicates that records belong to Connection with ID " + connectionId + " but attempted to swap those records into " + queue);
+            throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation +
+                " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
         }
 
         final int numRecords = in.readInt();
         in.readLong(); // Content Size
+        if (swapEncodingVersion > 7) {
+            in.readLong(); // Max Record ID
+        }
 
         return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
     }
 
-    static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
+    private static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
         final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
         final List<FlowFileRecord> flowFiles = new ArrayList<>();
         for (int i = 0; i < numFlowFiles; i++) {
@@ -543,7 +537,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
         final byte[] bytes = new byte[numBytes];
         fillBuffer(in, bytes, numBytes);
-        return new String(bytes, "UTF-8");
+        return new String(bytes, StandardCharsets.UTF_8);
     }
 
     private static Integer readFieldLength(final InputStream in) throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 23746ce..20f2642 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -286,7 +286,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
-    private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
+    private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
 
     // guarded by rwLock
     /**
@@ -393,7 +393,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
         eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
 
-        final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager);
+        final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager);
         flowFileRepository = flowFileRepo;
         flowFileEventRepository = flowFileEventRepo;
         counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
@@ -668,7 +668,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         try {
             final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class);
             synchronized (contentRepo) {
-                contentRepo.initialize(contentClaimManager);
+                contentRepo.initialize(resourceClaimManager);
             }
             return contentRepo;
         } catch (final Exception e) {
@@ -728,11 +728,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         // Create and initialize a FlowFileSwapManager for this connection
         final FlowFileSwapManager swapManager = createSwapManager(properties);
         final EventReporter eventReporter = createEventReporter(getBulletinRepository());
+
         try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
             final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
                 @Override
                 public ResourceClaimManager getResourceClaimManager() {
-                    return getResourceClaimManager();
+                    return resourceClaimManager;
                 }
 
                 @Override
@@ -756,6 +757,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             .destination(destination)
             .swapManager(swapManager)
             .eventReporter(eventReporter)
+            .resourceClaimManager(resourceClaimManager)
+            .flowFileRepository(flowFileRepository)
+            .provenanceRepository(provenanceEventRepository)
             .build();
     }
 
@@ -3188,7 +3192,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalArgumentException("Input Content Claim not specified");
             }
 
-            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
                 provEvent.getPreviousContentClaimIdentifier(), false);
             claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
             offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
@@ -3198,7 +3202,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 throw new IllegalArgumentException("Output Content Claim not specified");
             }
 
-            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
                 provEvent.getContentClaimIdentifier(), false);
 
             claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
@@ -3247,7 +3251,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         try {
-            final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+            final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
             final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
 
             if (!contentRepository.isAccessible(contentClaim)) {
@@ -3327,17 +3331,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         // Create the ContentClaim
-        final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
+        final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
             event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
-        contentClaimManager.incrementClaimantCount(resourceClaim);
+        resourceClaimManager.incrementClaimantCount(resourceClaim);
         final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
         final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
         contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
 
         if (!contentRepository.isAccessible(contentClaim)) {
-            contentClaimManager.decrementClaimantCount(resourceClaim);
+            resourceClaimManager.decrementClaimantCount(resourceClaim);
             throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a32a485..cfbb770 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -81,9 +81,11 @@ import org.slf4j.LoggerFactory;
  * <p>
  * Provides a ProcessSession that ensures all accesses, changes and transfers
  * occur in an atomic manner for all FlowFiles including their contents and
- * attributes</p>
+ * attributes
+ * </p>
  * <p>
- * NOT THREAD SAFE</p>
+ * NOT THREAD SAFE
+ * </p>
  * <p/>
  */
 public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
@@ -104,7 +106,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     private final Map<String, Long> globalCounters = new HashMap<>();
     private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
     private final ProcessContext context;
-    private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
+    private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
     private final Set<Path> deleteOnCommit = new HashSet<>();
     private final long sessionId;
     private final String connectableDescription;
@@ -114,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     private final StandardProvenanceReporter provenanceReporter;
 
-    private int removedCount = 0;    // number of flowfiles removed in this session
+    private int removedCount = 0; // number of flowfiles removed in this session
     private long removedBytes = 0L; // size of all flowfiles removed in this session
     private final LongHolder bytesRead = new LongHolder(0L);
     private final LongHolder bytesWritten = new LongHolder(0L);
@@ -169,7 +171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType,
-                context.getProvenanceRepository(), this);
+            context.getProvenanceRepository(), this);
         this.sessionId = idGenerator.getAndIncrement();
         this.connectableDescription = description;
 
@@ -196,7 +198,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         // Processor-reported events.
         List<ProvenanceEventRecord> autoTerminatedEvents = null;
 
-        //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
+        // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
         final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
         for (final StandardRepositoryRecord record : records.values()) {
             if (record.isMarkedForDelete()) {
@@ -235,11 +237,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     }
                 }
             } else {
-                final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element
+                final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
                 record.setDestination(finalDestination.getFlowFileQueue());
                 incrementConnectionInputCounts(finalDestination, record);
 
-                for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed
+                for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
                     incrementConnectionInputCounts(destination, record);
                     final FlowFileRecord currRec = record.getCurrent();
                     final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
@@ -256,7 +258,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     if (claim != null) {
                         context.getContentRepository().incrementClaimaintCount(claim);
                     }
-                    newRecord.setWorking(clone, Collections.<String, String>emptyMap());
+                    newRecord.setWorking(clone, Collections.<String, String> emptyMap());
 
                     newRecord.setDestination(destination.getFlowFileQueue());
                     newRecord.setTransferRelationship(record.getTransferRelationship());
@@ -322,9 +324,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
                     final Connectable connectable = context.getConnectable();
                     final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
-                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
                 } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
-                    //records which have been updated - remove original if exists
+                    // records which have been updated - remove original if exists
                     removeContent(record.getOriginalClaim());
                 }
             }
@@ -356,7 +358,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
             for (final StandardRepositoryRecord record : checkpoint.records.values()) {
                 if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
-                    continue; //these don't need to be transferred
+                    continue; // these don't need to be transferred
                 }
                 // record.getCurrent() will return null if this record was created in this session --
                 // in this case, we just ignore it, and it will be cleaned up by clearing the records map.
@@ -390,7 +392,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             if (LOG.isInfoEnabled()) {
                 final String sessionSummary = summarizeEvents(checkpoint);
                 if (!sessionSummary.isEmpty()) {
-                    LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
+                    LOG.info("{} for {}, committed the following events: {}", new Object[] {this, connectableDescription, sessionSummary});
                 }
             }
 
@@ -611,9 +613,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 boolean creationEventRegistered = false;
                 if (registeredTypes != null) {
                     if (registeredTypes.contains(ProvenanceEventType.CREATE)
-                            || registeredTypes.contains(ProvenanceEventType.FORK)
-                            || registeredTypes.contains(ProvenanceEventType.JOIN)
-                            || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
+                        || registeredTypes.contains(ProvenanceEventType.FORK)
+                        || registeredTypes.contains(ProvenanceEventType.JOIN)
+                        || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
                         creationEventRegistered = true;
                     }
                 }
@@ -747,7 +749,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     private StandardProvenanceEventRecord enrich(
-            final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
+        final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
         final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
@@ -1039,7 +1041,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         final StringBuilder sb = new StringBuilder(512);
         if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD
-                || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
+            || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
             if (numCreated > 0) {
                 sb.append("created ").append(numCreated).append(" FlowFiles, ");
             }
@@ -1097,7 +1099,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
     private void formatNanos(final long nanos, final StringBuilder sb) {
         final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
-        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;;
+        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
+        ;
         final long nanosLeft = nanos % 1000000L;
 
         if (seconds > 0) {
@@ -1272,7 +1275,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         int flowFileCount = 0;
         long byteCount = 0L;
         for (final Connection conn : context.getPollableConnections()) {
-            final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize();
+            final QueueSize queueSize = conn.getFlowFileQueue().size();
             flowFileCount += queueSize.getObjectCount();
             byteCount += queueSize.getByteCount();
         }
@@ -1287,8 +1290,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-                .addAttributes(attrs)
-                .build();
+            .addAttributes(attrs)
+            .build();
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
         record.setWorking(fFile, attrs);
         records.put(fFile, record);
@@ -1324,7 +1327,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             context.getContentRepository().incrementClaimaintCount(claim);
         }
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
-        record.setWorking(clone, Collections.<String, String>emptyMap());
+        record.setWorking(clone, Collections.<String, String> emptyMap());
         records.put(clone, record);
 
         if (offset == 0L && size == example.getSize()) {
@@ -1637,7 +1640,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             return;
         }
 
-        LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()});
+        LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()});
         final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
 
         final String processorType;
@@ -1650,7 +1653,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
-                processorType, context.getProvenanceRepository(), this);
+            processorType, context.getProvenanceRepository(), this);
 
         final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
         for (final FlowFileRecord flowFile : flowFiles) {
@@ -1664,7 +1667,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
             final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
-            LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+            LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
         }
 
         try {
@@ -1696,7 +1699,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                                     record.getContentClaimOffset() + claim.getOffset(), record.getSize());
                             }
 
-                            enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
+                            enriched.setAttributes(record.getAttributes(), Collections.<String, String> emptyMap());
                             return enriched.build();
                         }
 
@@ -1780,9 +1783,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
-                final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
-                final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
-                final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+            final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
+            final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+            final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
 
             // We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
             // Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@@ -1853,7 +1856,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         try {
             try (final OutputStream rawOut = contentRepo.write(newClaim);
-                    final OutputStream out = new BufferedOutputStream(rawOut)) {
+                final OutputStream out = new BufferedOutputStream(rawOut)) {
 
                 if (header != null && header.length > 0) {
                     out.write(header);
@@ -2070,10 +2073,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         // the original claim if the record is "working" but the content has not been modified
         // (e.g., in the case of attributes only were updated)
         // In other words:
-        //  If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
-        //  return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
-        //  that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
-        //  because we will do that later, in the session.commit() and that would result in removing the original claim twice.
+        // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
+        // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
+        // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
+        // because we will do that later, in the session.commit() and that would result in removing the original claim twice.
         if (contentModified) {
             // In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be
             // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
@@ -2196,7 +2199,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     @Override
     public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
         validateRecordState(destination);
-        //TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
+        // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
         if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) {
             // If we do NOT want to keep the file, ensure that we can delete it, or else error.
             throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
@@ -2228,9 +2231,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         removeTemporaryClaim(record);
 
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
-                .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
-                .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
-                .build();
+            .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
+            .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
+            .build();
         record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
         if (!keepSourceFile) {
             deleteOnCommit.add(source);
@@ -2370,7 +2373,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
      *
      * @param flowFile the FlowFile to check
      * @return <code>true</code> if the FlowFile is known in this session,
-     * <code>false</code> otherwise.
+     *         <code>false</code> otherwise.
      */
     boolean isFlowFileKnown(final FlowFile flowFile) {
         return records.containsKey(flowFile);
@@ -2392,8 +2395,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final String key = entry.getKey();
             final String value = entry.getValue();
             if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
-                    || CoreAttributes.DISCARD_REASON.key().equals(key)
-                    || CoreAttributes.UUID.key().equals(key)) {
+                || CoreAttributes.DISCARD_REASON.key().equals(key)
+                || CoreAttributes.UUID.key().equals(key)) {
                 continue;
             }
             newAttributes.put(key, value);
@@ -2441,10 +2444,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-                .addAttributes(newAttributes)
-                .lineageIdentifiers(lineageIdentifiers)
-                .lineageStartDate(lineageStartDate)
-                .build();
+            .addAttributes(newAttributes)
+            .lineageIdentifiers(lineageIdentifiers)
+            .lineageStartDate(lineageStartDate)
+            .build();
 
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
         record.setWorking(fFile, newAttributes);
@@ -2465,7 +2468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
      */
     private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
         final Map<String, String> result = new HashMap<>();
-        //trivial cases
+        // trivial cases
         if (flowFileList == null || flowFileList.isEmpty()) {
             return result;
         } else if (flowFileList.size() == 1) {
@@ -2478,8 +2481,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
          */
         final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
 
-        outer:
-        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+        outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
             final String key = mapEntry.getKey();
             final String value = mapEntry.getValue();
             for (final FlowFile flowFile : flowFileList) {
@@ -2539,7 +2541,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         private final Set<String> removedFlowFiles = new HashSet<>();
         private final Set<String> createdFlowFiles = new HashSet<>();
 
-        private int removedCount = 0;    // number of flowfiles removed in this session
+        private int removedCount = 0; // number of flowfiles removed in this session
         private long removedBytes = 0L; // size of all flowfiles removed in this session
         private long bytesRead = 0L;
         private long bytesWritten = 0L;

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
index c4d040b..3c4fcdb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
@@ -26,7 +26,7 @@ public class Connectables {
 
     public static boolean flowFilesQueued(final Connectable connectable) {
         for (final Connection conn : connectable.getIncomingConnections()) {
-            if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
+            if (!conn.getFlowFileQueue().isEmpty()) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 6eeddc5..f7191c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -22,16 +22,26 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -47,7 +57,7 @@ public class TestFileSystemSwapManager {
             final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
             Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, "/src/test/resources/old-swap-file.swap", new NopResourceClaimManager());
+            final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
             assertEquals(10000, records.size());
 
             for (final FlowFileRecord record : records) {
@@ -57,6 +67,53 @@ public class TestFileSystemSwapManager {
         }
     }
 
+    @Test
+    public void testRoundTripSerializeDeserialize() throws IOException {
+        final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+        final Map<String, String> attrs = new HashMap<>();
+        for (int i = 0; i < 10000; i++) {
+            attrs.put("i", String.valueOf(i));
+            final FlowFileRecord ff = new TestFlowFile(attrs, i);
+            toSwap.add(ff);
+        }
+
+        final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+        Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+        final String swapLocation = "target/testRoundTrip.swap";
+        final File swapFile = new File(swapLocation);
+        Files.deleteIfExists(swapFile.toPath());
+
+        try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+            FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+        }
+
+        final List<FlowFileRecord> swappedIn;
+        try (final FileInputStream fis = new FileInputStream(swapFile);
+            final DataInputStream dis = new DataInputStream(fis)) {
+            swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
+        }
+
+        assertEquals(toSwap.size(), swappedIn.size());
+        for (int i = 0; i < toSwap.size(); i++) {
+            final FlowFileRecord pre = toSwap.get(i);
+            final FlowFileRecord post = swappedIn.get(i);
+
+            assertEquals(pre.getSize(), post.getSize());
+            assertEquals(pre.getAttributes(), post.getAttributes());
+            assertEquals(pre.getSize(), post.getSize());
+            assertEquals(pre.getId(), post.getId());
+            assertEquals(pre.getContentClaim(), post.getContentClaim());
+            assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
+            assertEquals(pre.getEntryDate(), post.getEntryDate());
+            assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
+            assertEquals(pre.getLineageIdentifiers(), post.getLineageIdentifiers());
+            assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
+            assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
+        }
+    }
+
+
     public class NopResourceClaimManager implements ResourceClaimManager {
 
         @Override
@@ -100,4 +157,87 @@ public class TestFileSystemSwapManager {
         public void purge() {
         }
     }
+
+
+    private static class TestFlowFile implements FlowFileRecord {
+        private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+        private final long id = idGenerator.getAndIncrement();
+        private final long entryDate = System.currentTimeMillis();
+        private final long lastQueueDate = System.currentTimeMillis();
+        private final Map<String, String> attributes;
+        private final long size;
+
+
+        public TestFlowFile(final Map<String, String> attributes, final long size) {
+            this.attributes = attributes;
+            this.size = size;
+        }
+
+
+        @Override
+        public long getId() {
+            return id;
+        }
+
+        @Override
+        public long getEntryDate() {
+            return entryDate;
+        }
+
+        @Override
+        public long getLineageStartDate() {
+            return entryDate;
+        }
+
+        @Override
+        public Long getLastQueueDate() {
+            return lastQueueDate;
+        }
+
+        @Override
+        public Set<String> getLineageIdentifiers() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public boolean isPenalized() {
+            return false;
+        }
+
+        @Override
+        public String getAttribute(String key) {
+            return attributes.get(key);
+        }
+
+        @Override
+        public long getSize() {
+            return size;
+        }
+
+        @Override
+        public Map<String, String> getAttributes() {
+            return Collections.unmodifiableMap(attributes);
+        }
+
+        @Override
+        public int compareTo(final FlowFile o) {
+            return Long.compare(id, o.getId());
+        }
+
+        @Override
+        public long getPenaltyExpirationMillis() {
+            return -1L;
+        }
+
+        @Override
+        public ContentClaim getContentClaim() {
+            return null;
+        }
+
+        @Override
+        public long getContentClaimOffset() {
+            return 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 12f8e5e..1783708 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -134,7 +134,7 @@ public class TestStandardProcessSession {
         final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
 
         final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
-        flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, swapManager, null, 10000);
+        flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
         when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
 
         Mockito.doAnswer(new Answer<Object>() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 8bf5553..0e3bcac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.admin.service.UserService;
+import org.apache.nifi.authorization.DownloadAuthorization;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.connectable.Connectable;
@@ -47,9 +52,10 @@ import org.apache.nifi.controller.ContentAvailability;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -61,8 +67,8 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -75,7 +81,9 @@ import org.apache.nifi.provenance.search.SearchTerm;
 import org.apache.nifi.provenance.search.SearchTerms;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.search.SearchContext;
@@ -85,6 +93,7 @@ import org.apache.nifi.services.FlowService;
 import org.apache.nifi.user.NiFiUser;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.DownloadableContent;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
@@ -104,15 +113,6 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.DownloadableContent;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.admin.service.UserService;
-import org.apache.nifi.authorization.DownloadAuthorization;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.reporting.BulletinQuery;
-import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -501,7 +501,7 @@ public class ControllerFacade {
      * Site-to-Site communications
      *
      * @return the socket port that the Cluster Manager is listening on for
-     * Site-to-Site communications
+     *         Site-to-Site communications
      */
     public Integer getClusterManagerRemoteSiteListeningPort() {
         return flowController.getClusterManagerRemoteSiteListeningPort();
@@ -512,7 +512,7 @@ public class ControllerFacade {
      * Manager are secure
      *
      * @return whether or not Site-to-Site communications with the Cluster
-     * Manager are secure
+     *         Manager are secure
      */
     public Boolean isClusterManagerRemoteSiteCommsSecure() {
         return flowController.isClusterManagerRemoteSiteCommsSecure();
@@ -523,7 +523,7 @@ public class ControllerFacade {
      * Site-to-Site communications
      *
      * @return the socket port that the local instance is listening on for
-     * Site-to-Site communications
+     *         Site-to-Site communications
      */
     public Integer getRemoteSiteListeningPort() {
         return flowController.getRemoteSiteListeningPort();
@@ -534,7 +534,7 @@ public class ControllerFacade {
      * instance are secure
      *
      * @return whether or not Site-to-Site communications with the local
-     * instance are secure
+     *         instance are secure
      */
     public Boolean isRemoteSiteCommsSecure() {
         return flowController.isRemoteSiteCommsSecure();