You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:08 UTC

[12/14] nifi git commit: NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so th

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 1378d3b..fb06a15 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -16,24 +16,6 @@
  */
 package org.apache.nifi.cluster.coordination.node;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
@@ -47,8 +29,12 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.services.FlowService;
+import org.apache.nifi.state.MockStateMap;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.revision.RevisionManager;
 import org.junit.Assert;
@@ -58,11 +44,33 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
 public class TestNodeClusterCoordinator {
 
     private NodeClusterCoordinator coordinator;
     private ClusterCoordinationProtocolSenderListener senderListener;
     private List<NodeConnectionStatus> nodeStatuses;
+    private StateManagerProvider stateManagerProvider;
 
     private NiFiProperties createProperties() {
         final Map<String,String> addProps = new HashMap<>();
@@ -76,12 +84,18 @@ public class TestNodeClusterCoordinator {
 
         senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
         nodeStatuses = Collections.synchronizedList(new ArrayList<>());
+        stateManagerProvider = Mockito.mock(StateManagerProvider.class);
+
+        final StateManager stateManager = Mockito.mock(StateManager.class);
+        when(stateManager.getState(any(Scope.class))).thenReturn(new MockStateMap(Collections.emptyMap(), 1));
+        when(stateManagerProvider.getStateManager(anyString())).thenReturn(stateManager);
+
 
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null) {
+        coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, createProperties(), null, stateManagerProvider) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                 nodeStatuses.add(updatedStatus);
@@ -90,7 +104,7 @@ public class TestNodeClusterCoordinator {
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>());
-        Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
+        when(flowService.createDataFlow()).thenReturn(dataFlow);
         coordinator.setFlowService(flowService);
     }
 
@@ -130,14 +144,14 @@ public class TestNodeClusterCoordinator {
     }
 
     @Test
-    public void testTryAgainIfNoFlowServiceSet() {
+    public void testTryAgainIfNoFlowServiceSet() throws IOException {
         final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
         final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
-                null, revisionManager, createProperties(), null) {
+                null, revisionManager, createProperties(), null, stateManagerProvider) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -150,7 +164,7 @@ public class TestNodeClusterCoordinator {
 
         coordinator.setConnected(true);
 
-        final ProtocolMessage protocolResponse = coordinator.handle(requestMsg);
+        final ProtocolMessage protocolResponse = coordinator.handle(requestMsg, Collections.emptySet());
         assertNotNull(protocolResponse);
         assertTrue(protocolResponse instanceof ConnectionResponseMessage);
 
@@ -164,7 +178,7 @@ public class TestNodeClusterCoordinator {
         final ClusterCoordinationProtocolSenderListener senderListener = Mockito.mock(ClusterCoordinationProtocolSenderListener.class);
         final AtomicReference<ReconnectionRequestMessage> requestRef = new AtomicReference<>();
 
-        Mockito.when(senderListener.requestReconnection(Mockito.any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() {
+        when(senderListener.requestReconnection(any(ReconnectionRequestMessage.class))).thenAnswer(new Answer<Object>() {
             @Override
             public Object answer(InvocationOnMock invocation) throws Throwable {
                 final ReconnectionRequestMessage msg = invocation.getArgumentAt(0, ReconnectionRequestMessage.class);
@@ -175,10 +189,10 @@ public class TestNodeClusterCoordinator {
 
         final EventReporter eventReporter = Mockito.mock(EventReporter.class);
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
         final NodeClusterCoordinator coordinator = new NodeClusterCoordinator(senderListener, eventReporter, null, new FirstVoteWinsFlowElection(),
-                null, revisionManager, createProperties(), null) {
+                null, revisionManager, createProperties(), null, stateManagerProvider) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -186,7 +200,7 @@ public class TestNodeClusterCoordinator {
 
         final FlowService flowService = Mockito.mock(FlowService.class);
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], new byte[50], new byte[50], new HashSet<>());
-        Mockito.when(flowService.createDataFlowFromController()).thenReturn(dataFlow);
+        when(flowService.createDataFlowFromController()).thenReturn(dataFlow);
         coordinator.setFlowService(flowService);
         coordinator.setConnected(true);
 
@@ -232,7 +246,7 @@ public class TestNodeClusterCoordinator {
     @Test(timeout = 5000)
     public void testStatusChangesReplicated() throws InterruptedException, IOException {
         final RevisionManager revisionManager = Mockito.mock(RevisionManager.class);
-        Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
+        when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
         // Create a connection request message and send to the coordinator
         final NodeIdentifier requestedNodeId = createNodeId(1);
@@ -397,7 +411,7 @@ public class TestNodeClusterCoordinator {
         final NodeStatusChangeMessage msg = new NodeStatusChangeMessage();
         msg.setNodeId(nodeId1);
         msg.setNodeConnectionStatus(oldStatus);
-        coordinator.handle(msg);
+        coordinator.handle(msg, Collections.emptySet());
 
         // Ensure that no status change message was send
         Thread.sleep(1000);
@@ -413,7 +427,7 @@ public class TestNodeClusterCoordinator {
         final ConnectionRequestMessage crm = new ConnectionRequestMessage();
         crm.setConnectionRequest(connectionRequest);
 
-        final ProtocolMessage response = coordinator.handle(crm);
+        final ProtocolMessage response = coordinator.handle(crm, Collections.emptySet());
         assertNotNull(response);
         assertTrue(response instanceof ConnectionResponseMessage);
         final ConnectionResponseMessage responseMessage = (ConnectionResponseMessage) response;
@@ -424,7 +438,7 @@ public class TestNodeClusterCoordinator {
         final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
         crm2.setConnectionRequest(conRequest2);
 
-        final ProtocolMessage conflictingResponse = coordinator.handle(crm2);
+        final ProtocolMessage conflictingResponse = coordinator.handle(crm2, Collections.emptySet());
         assertNotNull(conflictingResponse);
         assertTrue(conflictingResponse instanceof ConnectionResponseMessage);
         final ConnectionResponseMessage conflictingResponseMessage = (ConnectionResponseMessage) conflictingResponse;
@@ -446,7 +460,7 @@ public class TestNodeClusterCoordinator {
         final ConnectionRequest request = new ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new byte[0], new byte[0], new HashSet<>()));
         final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
-        return coordinator.handle(requestMsg);
+        return coordinator.handle(requestMsg, Collections.emptySet());
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
index 45a2e42..3980865 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
@@ -219,7 +219,7 @@ public class ClusterConnectionIT {
         cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
         final Node coordinator = cluster.waitForClusterCoordinator(10, TimeUnit.SECONDS);
 
-        final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9285, null, false, null);
+        final NodeIdentifier node4NotReallyInCluster = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9283, "localhost", 9284, "localhost", 9286, "localhost", 9285, null, false, null);
 
         final Map<NodeIdentifier, NodeConnectionStatus> replacementStatuses = new HashMap<>();
         replacementStatuses.put(node1.getIdentifier(), new NodeConnectionStatus(node1.getIdentifier(), DisconnectionCode.USER_DISCONNECTED));

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index e0d8a97..3133736 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -17,17 +17,6 @@
 
 package org.apache.nifi.cluster.integration;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.bundle.Bundle;
@@ -73,6 +62,18 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.junit.Assert;
 import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 public class Node {
     private final NodeIdentifier nodeId;
     private final NiFiProperties nodeProperties;
@@ -133,7 +134,7 @@ public class Node {
 
 
     private static NodeIdentifier createNodeId() {
-        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null);
+        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null);
     }
 
     /**
@@ -296,8 +297,13 @@ public class Node {
         }
 
         final ClusterCoordinationProtocolSenderListener protocolSenderListener = new ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), protocolListener);
-        return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
-                revisionManager, nodeProperties, protocolSender);
+        try {
+            return new NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, flowElection, null,
+                    revisionManager, nodeProperties, protocolSender);
+        } catch (IOException e) {
+            Assert.fail(e.toString());
+            return null;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
index ffa3429..83d301b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ConnectionEntityMergerSpec.groovy
@@ -63,6 +63,6 @@ class ConnectionEntityMergerSpec extends Specification {
     }
 
     def createNodeIdentifier(int id) {
-        new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null)
+        new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null)
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
index bb1d595..3997bec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/ControllerServiceEntityMergerSpec.groovy
@@ -147,6 +147,6 @@ class ControllerServiceEntityMergerSpec extends Specification {
     }
 
     def createNodeIdentifier(int id) {
-        new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null)
+        new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null)
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
index 028c864..0a485b2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/LabelEntityMergerSpec.groovy
@@ -55,6 +55,6 @@ class LabelEntityMergerSpec extends Specification {
     }
 
     def createNodeIdentifier(int id) {
-        new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, 'stsaddr', id * 100, id * 1000, false, null)
+        new NodeIdentifier("cluster-node-$id", 'addr', id, 'sktaddr', id * 10, null, id * 10, 'stsaddr', id * 100, id * 1000, false, null)
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
deleted file mode 100644
index 6f55e79..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.controller;
-
-import org.apache.nifi.controller.queue.DropFlowFileState;
-import org.apache.nifi.controller.queue.DropFlowFileStatus;
-import org.apache.nifi.controller.queue.QueueSize;
-
-public class DropFlowFileRequest implements DropFlowFileStatus {
-    private final String identifier;
-    private final long submissionTime = System.currentTimeMillis();
-
-    private volatile QueueSize originalSize;
-    private volatile QueueSize currentSize;
-    private volatile QueueSize droppedSize = new QueueSize(0, 0L);
-    private volatile long lastUpdated = System.currentTimeMillis();
-    private volatile String failureReason;
-
-    private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
-
-
-    public DropFlowFileRequest(final String identifier) {
-        this.identifier = identifier;
-    }
-
-    @Override
-    public String getRequestIdentifier() {
-        return identifier;
-    }
-
-    @Override
-    public long getRequestSubmissionTime() {
-        return submissionTime;
-    }
-
-    @Override
-    public QueueSize getOriginalSize() {
-        return originalSize;
-    }
-
-    void setOriginalSize(final QueueSize originalSize) {
-        this.originalSize = originalSize;
-    }
-
-    @Override
-    public QueueSize getCurrentSize() {
-        return currentSize;
-    }
-
-    void setCurrentSize(final QueueSize queueSize) {
-        this.currentSize = queueSize;
-    }
-
-    @Override
-    public QueueSize getDroppedSize() {
-        return droppedSize;
-    }
-
-    void setDroppedSize(final QueueSize droppedSize) {
-        this.droppedSize = droppedSize;
-    }
-
-    @Override
-    public synchronized DropFlowFileState getState() {
-        return state;
-    }
-
-    @Override
-    public long getLastUpdated() {
-        return lastUpdated;
-    }
-
-    @Override
-    public String getFailureReason() {
-        return failureReason;
-    }
-
-    synchronized void setState(final DropFlowFileState state) {
-        setState(state, null);
-    }
-
-    synchronized void setState(final DropFlowFileState state, final String explanation) {
-        this.state = state;
-        this.failureReason = explanation;
-        this.lastUpdated = System.currentTimeMillis();
-    }
-
-    synchronized boolean cancel() {
-        if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
-            return false;
-        }
-
-        this.state = DropFlowFileState.CANCELED;
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
new file mode 100644
index 0000000..69a0b92
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+public class DropFlowFileRequest implements DropFlowFileStatus {
+    private final String identifier;
+    private final long submissionTime = System.currentTimeMillis();
+
+    private volatile QueueSize originalSize;
+    private volatile QueueSize currentSize;
+    private volatile QueueSize droppedSize = new QueueSize(0, 0L);
+    private volatile long lastUpdated = System.currentTimeMillis();
+    private volatile String failureReason;
+
+    private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
+
+
+    public DropFlowFileRequest(final String identifier) {
+        this.identifier = identifier;
+    }
+
+    @Override
+    public String getRequestIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public long getRequestSubmissionTime() {
+        return submissionTime;
+    }
+
+    @Override
+    public QueueSize getOriginalSize() {
+        return originalSize;
+    }
+
+    public void setOriginalSize(final QueueSize originalSize) {
+        this.originalSize = originalSize;
+    }
+
+    @Override
+    public QueueSize getCurrentSize() {
+        return currentSize;
+    }
+
+    public void setCurrentSize(final QueueSize queueSize) {
+        this.currentSize = queueSize;
+    }
+
+    @Override
+    public QueueSize getDroppedSize() {
+        return droppedSize;
+    }
+
+    public void setDroppedSize(final QueueSize droppedSize) {
+        this.droppedSize = droppedSize;
+    }
+
+    @Override
+    public synchronized DropFlowFileState getState() {
+        return state;
+    }
+
+    @Override
+    public long getLastUpdated() {
+        return lastUpdated;
+    }
+
+    @Override
+    public String getFailureReason() {
+        return failureReason;
+    }
+
+    public synchronized void setState(final DropFlowFileState state) {
+        setState(state, null);
+    }
+
+    public synchronized void setState(final DropFlowFileState state, final String explanation) {
+        this.state = state;
+        this.failureReason = explanation;
+        this.lastUpdated = System.currentTimeMillis();
+    }
+
+    public synchronized boolean cancel() {
+        if (this.state == DropFlowFileState.COMPLETE || this.state == DropFlowFileState.CANCELED) {
+            return false;
+        }
+
+        this.state = DropFlowFileState.CANCELED;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
index 5aeb5c5..b63be53 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/ContentNotFoundException.java
@@ -18,6 +18,8 @@ package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 
+import java.util.Optional;
+
 /**
  *
  */
@@ -25,23 +27,37 @@ public class ContentNotFoundException extends RuntimeException {
 
     private static final long serialVersionUID = 19048239082L;
     private final transient ContentClaim claim;
+    private final transient FlowFileRecord flowFile;
 
     public ContentNotFoundException(final ContentClaim claim) {
         super("Could not find content for " + claim);
         this.claim = claim;
+        this.flowFile = null;
     }
 
     public ContentNotFoundException(final ContentClaim claim, final Throwable t) {
         super("Could not find content for " + claim, t);
         this.claim = claim;
+        this.flowFile = null;
     }
 
     public ContentNotFoundException(final ContentClaim claim, final String message) {
         super("Could not find content for " + claim + ": " + message);
         this.claim = claim;
+        this.flowFile = null;
+    }
+
+    public ContentNotFoundException(final FlowFileRecord flowFile, final ContentClaim claim, final String message) {
+        super("Could not find content for " + claim + ": " + message);
+        this.claim = claim;
+        this.flowFile = flowFile;
     }
 
     public ContentClaim getMissingClaim() {
         return claim;
     }
+
+    public Optional<FlowFileRecord> getFlowFile() {
+        return Optional.ofNullable(flowFile);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 6172874..fe60585 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -29,18 +29,16 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.controller.ProcessScheduler;
-import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.scheduling.SchedulingStrategy;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -60,7 +58,7 @@ import java.util.stream.Collectors;
  * one or more relationships that map the source component to the destination
  * component.
  */
-public final class StandardConnection implements Connection {
+public final class StandardConnection implements Connection, ConnectionEventListener {
 
     private final String id;
     private final AtomicReference<ProcessGroup> processGroup;
@@ -69,13 +67,16 @@ public final class StandardConnection implements Connection {
     private final Connectable source;
     private final AtomicReference<Connectable> destination;
     private final AtomicReference<Collection<Relationship>> relationships;
-    private final StandardFlowFileQueue flowFileQueue;
     private final AtomicInteger labelIndex = new AtomicInteger(1);
     private final AtomicLong zIndex = new AtomicLong(0L);
     private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
     private final ProcessScheduler scheduler;
+    private final FlowFileQueueFactory flowFileQueueFactory;
+    private final boolean clustered;
     private final int hashCode;
 
+    private volatile FlowFileQueue flowFileQueue;
+
     private StandardConnection(final Builder builder) {
         id = builder.id;
         name = new AtomicReference<>(builder.name);
@@ -85,9 +86,10 @@ public final class StandardConnection implements Connection {
         destination = new AtomicReference<>(builder.destination);
         relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
-        flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
-                scheduler, builder.swapManager, builder.eventReporter, builder.queueSwapThreshold,
-                builder.defaultBackPressureObjectThreshold, builder.defaultBackPressureDataSizeThreshold);
+        flowFileQueueFactory = builder.flowFileQueueFactory;
+        clustered = builder.clustered;
+
+        flowFileQueue = flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this);
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -148,6 +150,20 @@ public final class StandardConnection implements Connection {
     }
 
     @Override
+    public void triggerDestinationEvent() {
+        if (getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+            scheduler.registerEvent(getDestination());
+        }
+    }
+
+    @Override
+    public void triggerSourceEvent() {
+        if (getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
+            scheduler.registerEvent(getSource());
+        }
+    }
+
+    @Override
     public Authorizable getSourceAuthorizable() {
         final Connectable sourceConnectable = getSource();
         final Authorizable sourceAuthorizable;
@@ -297,7 +313,7 @@ public final class StandardConnection implements Connection {
             throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
         }
 
-        if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) {
+        if (getFlowFileQueue().isUnacknowledgedFlowFile()) {
             throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
         }
 
@@ -354,7 +370,7 @@ public final class StandardConnection implements Connection {
 
     @Override
     public String toString() {
-        return "Connection[Source ID=" + id + ",Dest ID=" + getDestination().getIdentifier() + "]";
+        return "Connection[ID=" + getIdentifier() + ", Source ID=" + getSource().getIdentifier() + ", Dest ID=" + getDestination().getIdentifier() + "]";
     }
 
     /**
@@ -386,14 +402,8 @@ public final class StandardConnection implements Connection {
         private Connectable source;
         private Connectable destination;
         private Collection<Relationship> relationships;
-        private FlowFileSwapManager swapManager;
-        private EventReporter eventReporter;
-        private FlowFileRepository flowFileRepository;
-        private ProvenanceEventRepository provenanceRepository;
-        private ResourceClaimManager resourceClaimManager;
-        private int queueSwapThreshold;
-        private Long defaultBackPressureObjectThreshold;
-        private String defaultBackPressureDataSizeThreshold;
+        private FlowFileQueueFactory flowFileQueueFactory;
+        private boolean clustered = false;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -440,43 +450,13 @@ public final class StandardConnection implements Connection {
             return this;
         }
 
-        public Builder swapManager(final FlowFileSwapManager swapManager) {
-            this.swapManager = swapManager;
-            return this;
-        }
-
-        public Builder eventReporter(final EventReporter eventReporter) {
-            this.eventReporter = eventReporter;
-            return this;
-        }
-
-        public Builder flowFileRepository(final FlowFileRepository flowFileRepository) {
-            this.flowFileRepository = flowFileRepository;
+        public Builder flowFileQueueFactory(final FlowFileQueueFactory flowFileQueueFactory) {
+            this.flowFileQueueFactory = flowFileQueueFactory;
             return this;
         }
 
-        public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) {
-            this.provenanceRepository = provenanceRepository;
-            return this;
-        }
-
-        public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) {
-            this.resourceClaimManager = resourceClaimManager;
-            return this;
-        }
-
-        public Builder queueSwapThreshold(final int queueSwapThreshold) {
-            this.queueSwapThreshold = queueSwapThreshold;
-            return this;
-        }
-
-        public Builder defaultBackPressureObjectThreshold(final long defaultBackPressureObjectThreshold) {
-            this.defaultBackPressureObjectThreshold = defaultBackPressureObjectThreshold;
-            return this;
-        }
-
-        public Builder defaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
-            this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
+        public Builder clustered(final boolean clustered) {
+            this.clustered = clustered;
             return this;
         }
 
@@ -487,17 +467,8 @@ public final class StandardConnection implements Connection {
             if (destination == null) {
                 throw new IllegalStateException("Cannot build a Connection without a Destination");
             }
-            if (swapManager == null) {
-                throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
-            }
-            if (flowFileRepository == null) {
-                throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository");
-            }
-            if (provenanceRepository == null) {
-                throw new IllegalStateException("Cannot build a Connection without a Provenance Repository");
-            }
-            if (resourceClaimManager == null) {
-                throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager");
+            if (flowFileQueueFactory == null) {
+                throw new IllegalStateException("Cannot build a Connection without a FlowFileQueueFactory");
             }
 
             if (relationships == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 208bbce..5f8f925 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -16,6 +16,26 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapContents;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
+import org.apache.nifi.controller.swap.SchemaSwapSerializer;
+import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
+import org.apache.nifi.controller.swap.SwapDeserializer;
+import org.apache.nifi.controller.swap.SwapSerializer;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -29,34 +49,19 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Pattern;
-
-import org.apache.nifi.controller.queue.FlowFileQueue;
-import org.apache.nifi.controller.repository.FlowFileRecord;
-import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
-import org.apache.nifi.controller.swap.SchemaSwapSerializer;
-import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
-import org.apache.nifi.controller.swap.SwapDeserializer;
-import org.apache.nifi.controller.swap.SwapSerializer;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * <p>
@@ -66,9 +71,8 @@ import org.slf4j.LoggerFactory;
  */
 public class FileSystemSwapManager implements FlowFileSwapManager {
 
-    public static final int MINIMUM_SWAP_COUNT = 10000;
-    private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
-    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
+    private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
+    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
 
     public static final int SWAP_ENCODING_VERSION = 10;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
@@ -106,13 +110,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         this.flowFileRepository = initializationContext.getFlowFileRepository();
     }
 
+
     @Override
-    public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue) throws IOException {
+    public String swapOut(final List<FlowFileRecord> toSwap, final FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
         if (toSwap == null || toSwap.isEmpty()) {
             return null;
         }
 
-        final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString() + ".swap");
+        final String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString();
+        final String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
+        final String swapFileName = swapFileBaseName + ".swap";
+
+        final File swapFile = new File(storageDirectory, swapFileName);
         final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
         final String swapLocation = swapFile.getAbsolutePath();
 
@@ -185,8 +194,55 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
+    private String getOwnerQueueIdentifier(final File swapFile) {
+        final String[] splits = swapFile.getName().split("-");
+        if (splits.length > 6) {
+            final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
+            return queueIdentifier;
+        }
+
+        return null;
+    }
+
+    private String getOwnerPartition(final File swapFile) {
+        final String filename = swapFile.getName();
+        final int indexOfDot = filename.indexOf(".");
+        if (indexOfDot < 1) {
+            return null;
+        }
+
+        final int lastIndexOfDot = filename.lastIndexOf(".");
+        if (lastIndexOfDot == indexOfDot) {
+            return null;
+        }
+
+        return filename.substring(indexOfDot + 1, lastIndexOfDot);
+    }
+
+    @Override
+    public Set<String> getSwappedPartitionNames(final FlowFileQueue queue) {
+        final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String name) {
+                return SWAP_FILE_PATTERN.matcher(name).matches();
+            }
+        });
+
+        if (swapFiles == null) {
+            return Collections.emptySet();
+        }
+
+        final String queueId = queue.getIdentifier();
+
+        return Stream.of(swapFiles)
+            .filter(swapFile -> queueId.equals(getOwnerQueueIdentifier(swapFile)))
+            .map(this::getOwnerPartition)
+            .filter(Objects::nonNull)
+            .collect(Collectors.toSet());
+    }
+
     @Override
-    public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
+    public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue, final String partitionName) throws IOException {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
@@ -212,15 +268,21 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             }
 
             // split the filename by dashes. The old filenaming scheme was "<timestamp>-<randomuuid>.swap" but the new naming scheme is
-            // "<timestamp>-<queue identifier>-<random uuid>.swap". If we have two dashes, then we can just check if the queue ID is equal
-            // to the id of the queue given and if not we can just move on.
-            final String[] splits = swapFile.getName().split("-");
-            if (splits.length > 6) {
-                final String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
-                if (queueIdentifier.equals(flowFileQueue.getIdentifier())) {
-                    swapLocations.add(swapFile.getAbsolutePath());
+            // "<timestamp>-<queue identifier>-<random uuid>.[partition name.]swap".
+            final String ownerQueueId = getOwnerQueueIdentifier(swapFile);
+            if (ownerQueueId != null) {
+                if (!ownerQueueId.equals(flowFileQueue.getIdentifier())) {
+                    continue;
+                }
+
+                if (partitionName != null) {
+                    final String ownerPartition = getOwnerPartition(swapFile);
+                    if (!partitionName.equals(ownerPartition)) {
+                        continue;
+                    }
                 }
 
+                swapLocations.add(swapFile.getAbsolutePath());
                 continue;
             }
 
@@ -357,4 +419,28 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
             }
         }
     }
+
+    @Override
+    public String changePartitionName(final String swapLocation, final String newPartitionName) throws IOException {
+        final File existingFile = new File(swapLocation);
+        if (!existingFile.exists()) {
+            throw new FileNotFoundException("Could not change name of partition for swap location " + swapLocation + " because no swap file exists at that location");
+        }
+
+        final String existingFilename = existingFile.getName();
+
+        final String newFilename;
+        final int dotIndex = existingFilename.indexOf(".");
+        if (dotIndex < 0) {
+            newFilename = existingFilename + "." + newPartitionName + ".swap";
+        } else {
+            newFilename = existingFilename.substring(0, dotIndex) + "." + newPartitionName + ".swap";
+        }
+
+        final File newFile = new File(existingFile.getParentFile(), newFilename);
+        // Use Files.move and convert to Path's instead of File.rename so that we get an IOException on failure that describes why we failed.
+        Files.move(existingFile.toPath(), newFile.toPath());
+
+        return newFile.getAbsolutePath();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 21c61e9..ebd809c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -35,6 +35,8 @@ import org.apache.nifi.authorization.resource.DataAuthorizable;
 import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.util.IdentityMapping;
+import org.apache.nifi.authorization.util.IdentityMappingUtil;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
@@ -76,8 +78,23 @@ import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
+import org.apache.nifi.controller.queue.ConnectionEventListener;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueFactory;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.clustered.ContentRepositoryFlowFileAccess;
+import org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
+import org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer;
+import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
+import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
 import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;
 import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
@@ -243,6 +260,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -324,6 +342,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final VariableRegistry variableRegistry;
     private final ConcurrentMap<String, ControllerServiceNode> rootControllerServices = new ConcurrentHashMap<>();
 
+    private final ConnectionLoadBalanceServer loadBalanceServer;
+    private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
+    private final FlowEngine loadBalanceClientThreadPool;
+    private final Set<NioAsyncLoadBalanceClientTask> loadBalanceClientTasks = new HashSet<>();
+
     private final ConcurrentMap<String, ProcessorNode> allProcessors = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, Connection> allConnections = new ConcurrentHashMap<>();
@@ -673,8 +696,40 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
             leaderElectionManager.start();
             heartbeatMonitor.start();
+
+            final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
+            // Setup Load Balancing Server
+            final EventReporter eventReporter = createEventReporter(bulletinRepository);
+            final List<IdentityMapping> identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties);
+            final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
+            final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection);
+
+            final int numThreads = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
+            final String timeoutPeriod = nifiProperties.getProperty(NiFiProperties.LOAD_BALANCE_COMMS_TIMEOUT, NiFiProperties.DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT);
+            final int timeoutMillis = (int) FormatUtils.getTimeDuration(timeoutPeriod, TimeUnit.MILLISECONDS);
+
+            loadBalanceServer = new ConnectionLoadBalanceServer(loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), sslContext,
+                    numThreads, loadBalanceProtocol, eventReporter, timeoutMillis);
+
+
+            final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE);
+            final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository),
+                eventReporter, new StandardLoadBalanceFlowFileCodec());
+            loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);
+
+            final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
+            loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true);
+
+            for (int i=0; i < loadBalanceClientThreadCount; i++) {
+                final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(loadBalanceClientRegistry, clusterCoordinator, eventReporter);
+                loadBalanceClientTasks.add(clientTask);
+                loadBalanceClientThreadPool.submit(clientTask);
+            }
         } else {
+            loadBalanceClientRegistry = null;
             heartbeater = null;
+            loadBalanceServer = null;
+            loadBalanceClientThreadPool = null;
         }
     }
 
@@ -775,6 +830,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 listener.start();
             }
 
+            if (loadBalanceServer != null) {
+                loadBalanceServer.start();
+            }
+
             notifyComponentsConfigurationRestored();
 
             timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@@ -940,11 +999,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 startConnectablesAfterInitialization.clear();
                 startRemoteGroupPortsAfterInitialization.clear();
             }
+
+            for (final Connection connection : getRootGroup().findAllConnections()) {
+                connection.getFlowFileQueue().startLoadBalancing();
+            }
         } finally {
             writeLock.unlock("onFlowInitialized");
         }
     }
 
+    public boolean isStartAfterInitialization(final Connectable component) {
+        return startConnectablesAfterInitialization.contains(component) || startRemoteGroupPortsAfterInitialization.contains(component);
+    }
+
     private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
         final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
@@ -1040,20 +1107,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             swapManager.initialize(initializationContext);
         }
 
-        return builder.id(requireNonNull(id).intern())
+        final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
+            @Override
+            public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener) {
+                final FlowFileQueue flowFileQueue;
+
+                if (clusterCoordinator == null) {
+                    flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
+                            eventReporter, nifiProperties.getQueueSwapThreshold(), nifiProperties.getDefaultBackPressureObjectThreshold(), nifiProperties.getDefaultBackPressureDataSizeThreshold());
+                } else {
+                    flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
+                            clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
+
+                    flowFileQueue.setBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold());
+                    flowFileQueue.setBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold());
+                }
+
+                return flowFileQueue;
+            }
+        };
+
+        final Connection connection = builder.id(requireNonNull(id).intern())
                 .name(name == null ? null : name.intern())
                 .relationships(relationships)
                 .source(requireNonNull(source))
                 .destination(destination)
-                .swapManager(swapManager)
-                .queueSwapThreshold(nifiProperties.getQueueSwapThreshold())
-                .defaultBackPressureObjectThreshold(nifiProperties.getDefaultBackPressureObjectThreshold())
-                .defaultBackPressureDataSizeThreshold(nifiProperties.getDefaultBackPressureDataSizeThreshold())
-                .eventReporter(eventReporter)
-                .resourceClaimManager(resourceClaimManager)
-                .flowFileRepository(flowFileRepository)
-                .provenanceRepository(provenanceRepository)
+                .flowFileQueueFactory(flowFileQueueFactory)
                 .build();
+
+        return connection;
     }
 
     /**
@@ -1561,6 +1643,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 zooKeeperStateServer.shutdown();
             }
 
+            if (loadBalanceClientThreadPool != null) {
+                loadBalanceClientThreadPool.shutdownNow();
+            }
+            loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop);
+
             // Trigger any processors' methods marked with @OnShutdown to be called
             getRootGroup().shutdown();
 
@@ -1606,6 +1693,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 listener.stop();
             }
 
+            if (loadBalanceServer != null) {
+                loadBalanceServer.stop();
+            }
+
+            if (loadBalanceClientRegistry != null) {
+                loadBalanceClientRegistry.stop();
+            }
+
             if (processScheduler != null) {
                 processScheduler.shutdown();
             }
@@ -2226,6 +2321,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     queue.setPriorities(newPrioritizers);
                 }
 
+                final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
+                if (loadBalanceStrategyName != null) {
+                    final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
+                    final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute();
+                    queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
+                }
+
                 connection.setProcessGroup(group);
                 group.addConnection(connection);
             }
@@ -2737,6 +2839,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     public void onConnectionAdded(final Connection connection) {
         allConnections.put(connection.getIdentifier(), connection);
+
+        if (isInitialized()) {
+            connection.getFlowFileQueue().startLoadBalancing();
+        }
     }
 
     public void onConnectionRemoved(final Connection connection) {
@@ -3494,6 +3600,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+    public void stopTransmitting(final RemoteGroupPort remoteGroupPort) {
+        writeLock.lock();
+        try {
+            if (initialized.get()) {
+                remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
+            } else {
+                startRemoteGroupPortsAfterInitialization.remove(remoteGroupPort);
+            }
+        } finally {
+            writeLock.unlock("stopTransmitting");
+        }
+    }
+
     public void stopProcessor(final String parentGroupId, final String processorId) {
         final ProcessGroup group = lookupGroup(parentGroupId);
         final ProcessorNode node = group.getProcessor(processorId);
@@ -4344,10 +4463,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     leaderElectionManager.start();
                     stateManagerProvider.enableClusterProvider();
 
+                    loadBalanceClientRegistry.start();
+
                     heartbeat();
                 } else {
                     stateManagerProvider.disableClusterProvider();
-
                     setPrimary(false);
                 }
 
@@ -4369,6 +4489,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
+
+
     /**
      * @return true if this instance is the primary node in the cluster; false
      * otherwise