You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:28:59 UTC

[03/14] nifi git commit: NIFI-5516: Implement Load-Balanced Connections Refactoring StandardFlowFileQueue to have an AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added documentation, cleaned up code some Refactored FlowFileQueue so th

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
new file mode 100644
index 0000000..17e9237
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -0,0 +1,1345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.MockFlowFileRecord;
+import org.apache.nifi.controller.MockSwapManager;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
+import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
+import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
+import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
+import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
+import org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException;
+import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LoadBalancedQueueIT {
+    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
+    private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> {
+        throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized");
+    };
+
+    private final MockSwapManager flowFileSwapManager = new MockSwapManager();
+    private final String queueId = "unit-test";
+    private final EventReporter eventReporter = EventReporter.NO_OP;
+    private final int swapThreshold = 10_000;
+
+    private Set<NodeIdentifier> nodeIdentifiers;
+    private ClusterCoordinator clusterCoordinator;
+    private NodeIdentifier localNodeId;
+    private ProcessScheduler processScheduler;
+    private ResourceClaimManager resourceClaimManager;
+    private LoadBalancedFlowFileQueue serverQueue;
+    private FlowController flowController;
+
+    private ProvenanceRepository clientProvRepo;
+    private ContentRepository clientContentRepo;
+    private List<RepositoryRecord> clientRepoRecords;
+    private FlowFileRepository clientFlowFileRepo;
+    private ConcurrentMap<ContentClaim, byte[]> clientClaimContents;
+
+    private ProvenanceRepository serverProvRepo;
+    private List<RepositoryRecord> serverRepoRecords;
+    private FlowFileRepository serverFlowFileRepo;
+    private ConcurrentMap<ContentClaim, byte[]> serverClaimContents;
+    private ContentRepository serverContentRepo;
+
+    private SSLContext sslContext;
+
+    private final Set<ClusterTopologyEventListener> clusterEventListeners = Collections.synchronizedSet(new HashSet<>());
+    private final AtomicReference<LoadBalanceCompression> compressionReference = new AtomicReference<>();
+
+    @Before
+    public void setup() throws IOException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+        compressionReference.set(LoadBalanceCompression.DO_NOT_COMPRESS);
+
+        nodeIdentifiers = new HashSet<>();
+
+        clusterCoordinator = mock(ClusterCoordinator.class);
+        when(clusterCoordinator.getNodeIdentifiers()).thenAnswer(invocation -> new HashSet<>(nodeIdentifiers));
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenAnswer(invocation -> localNodeId);
+
+        clusterEventListeners.clear();
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) {
+                clusterEventListeners.add(invocation.getArgumentAt(0, ClusterTopologyEventListener.class));
+                return null;
+            }
+        }).when(clusterCoordinator).registerEventListener(any(ClusterTopologyEventListener.class));
+
+        processScheduler = mock(ProcessScheduler.class);
+        clientProvRepo = mock(ProvenanceRepository.class);
+        resourceClaimManager = new StandardResourceClaimManager();
+        final Connection connection = mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn(queueId);
+
+        serverQueue = mock(LoadBalancedFlowFileQueue.class);
+        when(serverQueue.isFull()).thenReturn(false);
+        when(connection.getFlowFileQueue()).thenReturn(serverQueue);
+        doAnswer(invocation -> compressionReference.get()).when(serverQueue).getLoadBalanceCompression();
+
+        flowController = mock(FlowController.class);
+        when(flowController.getConnection(anyString())).thenReturn(connection);
+
+        // Create repos for the server
+        serverRepoRecords = Collections.synchronizedList(new ArrayList<>());
+        serverFlowFileRepo = createFlowFileRepository(serverRepoRecords);
+
+        serverClaimContents = new ConcurrentHashMap<>();
+        serverContentRepo = createContentRepository(serverClaimContents);
+        serverProvRepo = mock(ProvenanceRepository.class);
+
+        clientClaimContents = new ConcurrentHashMap<>();
+        clientContentRepo = createContentRepository(clientClaimContents);
+        clientRepoRecords = Collections.synchronizedList(new ArrayList<>());
+        clientFlowFileRepo = createFlowFileRepository(clientRepoRecords);
+
+        final String keystore = "src/test/resources/localhost-ks.jks";
+        final String keystorePass = "OI7kMpWzzVNVx/JGhTL/0uO4+PWpGJ46uZ/pfepbkwI";
+        final String keyPass = keystorePass;
+        final String truststore = "src/test/resources/localhost-ts.jks";
+        final String truststorePass = "wAOR0nQJ2EXvOP0JZ2EaqA/n7W69ILS4sWAHghmIWCc";
+        sslContext = SslContextFactory.createSslContext(keystore, keystorePass.toCharArray(), keyPass.toCharArray(), "JKS",
+                truststore, truststorePass.toCharArray(), "JKS",
+                SslContextFactory.ClientAuth.REQUIRED, "TLS");
+    }
+
+
+    private ContentClaim createContentClaim(final byte[] bytes) {
+        final ResourceClaim resourceClaim = mock(ResourceClaim.class);
+        when(resourceClaim.getContainer()).thenReturn("container");
+        when(resourceClaim.getSection()).thenReturn("section");
+        when(resourceClaim.getId()).thenReturn("identifier");
+
+        final ContentClaim contentClaim = mock(ContentClaim.class);
+        when(contentClaim.getResourceClaim()).thenReturn(resourceClaim);
+
+        if (bytes != null) {
+            clientClaimContents.put(contentClaim, bytes);
+        }
+
+        return contentClaim;
+    }
+
+
+    private NioAsyncLoadBalanceClientFactory createClientFactory(final SSLContext sslContext) {
+        final FlowFileContentAccess flowFileContentAccess = flowFile -> clientContentRepo.read(flowFile.getContentClaim());
+        return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec());
+    }
+
+    @Test(timeout = 20_000)
+    public void testNewNodeAdded() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 1000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+        clientRegistry.start();
+
+        final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+        when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+        when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+        final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+        final Thread clientThread = new Thread(clientTask);
+
+        final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+
+        flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+        final int serverCount = 5;
+        final ConnectionLoadBalanceServer[] servers = new ConnectionLoadBalanceServer[serverCount];
+
+        try {
+            flowFileQueue.startLoadBalancing();
+            clientThread.start();
+
+            for (int i = 0; i < serverCount; i++) {
+                final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 8, loadBalanceProtocol, eventReporter, timeoutMillis);
+                servers[i] = server;
+                server.start();
+
+                final int loadBalancePort = server.getPort();
+
+                // Create the Load Balanced FlowFile Queue
+                final NodeIdentifier nodeId = new NodeIdentifier("unit-test-" + i, "localhost", 8090 + i, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+                nodeIdentifiers.add(nodeId);
+
+
+                clusterEventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
+
+                for (int j=0; j < 2; j++) {
+                    final Map<String, String> attributes = new HashMap<>();
+                    attributes.put("greeting", "hello");
+
+                    final MockFlowFileRecord flowFile = new MockFlowFileRecord(attributes, 0L);
+                    flowFileQueue.put(flowFile);
+                }
+            }
+
+            final int totalFlowFileCount = 6;
+
+            // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+            final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+            while (serverRepoRecords.size() < totalFlowFileCount && System.currentTimeMillis() < endTime) {
+                Thread.sleep(10L);
+            }
+
+            assertFalse("Server's FlowFile Repo was never fully updated", serverRepoRecords.isEmpty());
+
+            assertEquals(totalFlowFileCount, serverRepoRecords.size());
+
+            for (final RepositoryRecord serverRecord : serverRepoRecords) {
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("hello", serverFlowFile.getAttribute("greeting"));
+            }
+
+            while (clientRepoRecords.size() < totalFlowFileCount) {
+                Thread.sleep(10L);
+            }
+
+            assertEquals(totalFlowFileCount, clientRepoRecords.size());
+
+            for (final RepositoryRecord clientRecord : clientRepoRecords) {
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            }
+        } finally {
+            clientTask.stop();
+
+            flowFileQueue.stopLoadBalancing();
+
+            clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            Arrays.stream(servers).filter(Objects::nonNull).forEach(ConnectionLoadBalanceServer::stop);
+        }
+    }
+
+    @Test(timeout = 60_000)
+    public void testFailover() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 1000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier availableNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(availableNodeId);
+
+            // Add a Node Identifier pointing to a non-existent server
+            final NodeIdentifier inaccessibleNodeId = new NodeIdentifier("unit-test-invalid-host-does-not-exist", "invalid-host-does-not-exist", 8090, "invalid-host-does-not-exist", 8090,
+                    "invalid-host-does-not-exist", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(inaccessibleNodeId);
+
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                final int numFlowFiles = 1200;
+                for (int i = 0; i < numFlowFiles; i++) {
+                    final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+
+                    final Map<String, String> attributes = new HashMap<>();
+                    attributes.put("uuid", UUID.randomUUID().toString());
+                    attributes.put("greeting", "hello");
+
+                    final MockFlowFileRecord flowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                    flowFileQueue.put(flowFile);
+                }
+
+                flowFileQueue.startLoadBalancing();
+
+                clientThread.start();
+
+                // Sending to one partition should fail. When that happens, half of the FlowFiles should go to the local partition,
+                // the other half to the other node. So the total number of FlowFiles expected is ((numFlowFiles per node) / 3 * 1.5)
+                final int flowFilesPerNode = numFlowFiles / 3;
+                final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2;
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
+                while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never fully updated", serverRepoRecords.isEmpty());
+
+                assertEquals(expectedFlowFileReceiveCount, serverRepoRecords.size());
+
+                for (final RepositoryRecord serverRecord : serverRepoRecords) {
+                    final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                    assertEquals("hello", serverFlowFile.getAttribute("greeting"));
+
+                    final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                    final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                    assertArrayEquals("hello".getBytes(), Arrays.copyOfRange(serverFlowFileContent, serverFlowFileContent.length - 5, serverFlowFileContent.length));
+                }
+
+                // We expect the client records to be numFlowFiles / 2 because half of the FlowFile will have gone to the other node
+                // in the cluster and half would still be in the local partition.
+                while (clientRepoRecords.size() < numFlowFiles / 2) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(numFlowFiles / 2, clientRepoRecords.size());
+
+                for (final RepositoryRecord clientRecord : clientRepoRecords) {
+                    assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+                }
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 20_000)
+    public void testTransferToRemoteNode() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+                assertEquals(1, serverRepoRecords.size());
+
+                final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("test", serverFlowFile.getAttribute("integration"));
+                assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 20_000)
+    public void testContentNotFound() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                this.clientClaimContents.remove(contentClaim); // cause ContentNotFoundException
+
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.CONTENTMISSING, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 20_000)
+    public void testTransferToRemoteNodeAttributeCompression() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+        compressionReference.set(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+            flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+                assertEquals(1, serverRepoRecords.size());
+
+                final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("test", serverFlowFile.getAttribute("integration"));
+                assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 20_000)
+    public void testTransferToRemoteNodeContentCompression() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+        compressionReference.set(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+            flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+                assertEquals(1, serverRepoRecords.size());
+
+                final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("test", serverFlowFile.getAttribute("integration"));
+                assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+    @Test(timeout = 20_000)
+    public void testWithSSLContext() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+                assertEquals(1, serverRepoRecords.size());
+
+                final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("test", serverFlowFile.getAttribute("integration"));
+                assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 60_000)
+    public void testReusingClient() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                for (int i = 1; i <= 10; i++) {
+                    final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                    flowFileQueue.put(firstFlowFile);
+
+                    final Map<String, String> attributes = new HashMap<>();
+                    attributes.put("integration", "test");
+                    attributes.put("unit-test", "false");
+                    attributes.put("integration-test", "true");
+
+                    final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                    final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                    flowFileQueue.put(secondFlowFile);
+
+                    flowFileQueue.startLoadBalancing();
+
+                    // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                    final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                    while (serverRepoRecords.size() < i && System.currentTimeMillis() < endTime) {
+                        Thread.sleep(10L);
+                    }
+
+                    assertEquals(i, serverRepoRecords.size());
+
+                    final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                    final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                    assertEquals("test", serverFlowFile.getAttribute("integration"));
+                    assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                    assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                    final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                    final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                    assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+                    while (clientRepoRecords.size() < i) {
+                        Thread.sleep(10L);
+                    }
+
+                    assertEquals(i, clientRepoRecords.size());
+                    final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                    assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+                }
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 20_000)
+    public void testLargePayload() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            final byte[] payload = new byte[1024 * 1024];
+            Arrays.fill(payload, (byte) 'A');
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim(payload);
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, payload.length, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+                assertEquals(1, serverRepoRecords.size());
+
+                final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("test", serverFlowFile.getAttribute("integration"));
+                assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                assertArrayEquals(payload, serverFlowFileContent);
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 60_000)
+    public void testServerClosesUnexpectedly() throws IOException, InterruptedException {
+
+        doAnswer(new Answer<OutputStream>() {
+            int iterations = 0;
+
+            @Override
+            public OutputStream answer(final InvocationOnMock invocation) {
+                if (iterations++ < 5) {
+                    return new OutputStream() {
+                        @Override
+                        public void write(final int b) throws IOException {
+                            throw new IOException("Intentional unit test failure");
+                        }
+                    };
+                }
+
+                final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class);
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+                    @Override
+                    public void close() throws IOException {
+                        super.close();
+                        serverClaimContents.put(contentClaim, toByteArray());
+                    }
+                };
+
+                return baos;
+            }
+        }).when(serverContentRepo).write(any(ContentClaim.class));
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+        final SSLContext sslContext = null;
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new FlowFilePartitioner() {
+                @Override
+                public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) {
+                    for (final QueuePartition partition : partitions) {
+                        if (partition != localPartition) {
+                            return partition;
+                        }
+                    }
+
+                    return null;
+                }
+
+                @Override
+                public boolean isRebalanceOnClusterResize() {
+                    return true;
+                }
+                @Override
+                public boolean isRebalanceOnFailure() {
+                    return true;
+                }
+            });
+
+            try {
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+                final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+                while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+                    Thread.sleep(10L);
+                }
+
+                assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+                assertEquals(1, serverRepoRecords.size());
+
+                final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+                final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+                assertEquals("test", serverFlowFile.getAttribute("integration"));
+                assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+                assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+                final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+                final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+                assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+                while (clientRepoRecords.size() == 0) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, clientRepoRecords.size());
+                final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+                assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 20_000)
+    public void testNotAuthorized() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, NEVER_AUTHORIZED);
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                    clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                Thread.sleep(5000L);
+
+                assertTrue("Server's FlowFile Repo was updated", serverRepoRecords.isEmpty());
+                assertTrue(clientRepoRecords.isEmpty());
+
+                assertEquals(2, flowFileQueue.size().getObjectCount());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+
+    @Test(timeout = 35_000)
+    public void testDestinationNodeQueueFull() throws IOException, InterruptedException {
+        localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+        nodeIdentifiers.add(localNodeId);
+
+        when(serverQueue.isFull()).thenReturn(true);
+
+        // Create the server
+        final int timeoutMillis = 30000;
+        final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+        final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+        server.start();
+
+        try {
+            final int loadBalancePort = server.getPort();
+
+            // Create the Load Balanced FlowFile Queue
+            final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+            nodeIdentifiers.add(remoteNodeId);
+
+            final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+            clientRegistry.start();
+
+            final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+            when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+            when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+            final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+            final Thread clientThread = new Thread(clientTask);
+            clientThread.setDaemon(true);
+            clientThread.start();
+
+            final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+                clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+            flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+            try {
+                final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+                flowFileQueue.put(firstFlowFile);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("integration", "test");
+                attributes.put("unit-test", "false");
+                attributes.put("integration-test", "true");
+
+                final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+                final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+                flowFileQueue.put(secondFlowFile);
+
+                flowFileQueue.startLoadBalancing();
+
+                Thread.sleep(5000L);
+
+                assertTrue("Server's FlowFile Repo was updated", serverRepoRecords.isEmpty());
+                assertTrue(clientRepoRecords.isEmpty());
+
+                assertEquals(2, flowFileQueue.size().getObjectCount());
+
+                // Enable data to be transferred
+                when(serverQueue.isFull()).thenReturn(false);
+
+                while (clientRepoRecords.size() != 1) {
+                    Thread.sleep(10L);
+                }
+
+                assertEquals(1, serverRepoRecords.size());
+            } finally {
+                flowFileQueue.stopLoadBalancing();
+                clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+            }
+        } finally {
+            server.stop();
+        }
+    }
+
+    private FlowFileRepository createFlowFileRepository(final List<RepositoryRecord> repoRecords) throws IOException {
+        final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
+        doAnswer(invocation -> {
+            final Collection records = invocation.getArgumentAt(0, Collection.class);
+            repoRecords.addAll(records);
+            return null;
+        }).when(flowFileRepo).updateRepository(anyCollection());
+
+        return flowFileRepo;
+    }
+
+
+    private ContentRepository createContentRepository(final ConcurrentMap<ContentClaim, byte[]> claimContents) throws IOException {
+        final ContentRepository contentRepo = mock(ContentRepository.class);
+
+        Mockito.doAnswer(new Answer<ContentClaim>() {
+            @Override
+            public ContentClaim answer(final InvocationOnMock invocation) {
+                return createContentClaim(null);
+            }
+        }).when(contentRepo).create(Mockito.anyBoolean());
+
+
+        Mockito.doAnswer(new Answer<OutputStream>() {
+            @Override
+            public OutputStream answer(final InvocationOnMock invocation) {
+                final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class);
+
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+                    @Override
+                    public void close() throws IOException {
+                        super.close();
+                        claimContents.put(contentClaim, toByteArray());
+                    }
+                };
+
+                return baos;
+            }
+        }).when(contentRepo).write(any(ContentClaim.class));
+
+
+        Mockito.doAnswer(new Answer<InputStream>() {
+            @Override
+            public InputStream answer(final InvocationOnMock invocation) {
+                final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class);
+                if (contentClaim == null) {
+                    return new ByteArrayInputStream(new byte[0]);
+                }
+
+                final byte[] bytes = claimContents.get(contentClaim);
+                if (bytes == null) {
+                    throw new ContentNotFoundException(contentClaim);
+                }
+
+                return new ByteArrayInputStream(bytes);
+            }
+        }).when(contentRepo).read(any(ContentClaim.class));
+
+        return contentRepo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java
new file mode 100644
index 0000000..dc5c1db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+public class MockTransferFailureDestination implements TransferFailureDestination {
+    private List<FlowFileRecord> flowFilesTransferred = new ArrayList<>();
+    private List<String> swapFilesTransferred = new ArrayList<>();
+    private final boolean rebalanceOnFailure;
+
+    public MockTransferFailureDestination(final boolean rebalanceOnFailure) {
+        this.rebalanceOnFailure = rebalanceOnFailure;
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> flowFiles, final FlowFilePartitioner partitionerUsed) {
+        flowFilesTransferred.addAll(flowFiles);
+    }
+
+    public List<FlowFileRecord> getFlowFilesTransferred() {
+        return flowFilesTransferred;
+    }
+
+    @Override
+    public void putAll(final Function<String, FlowFileQueueContents> queueContents, final FlowFilePartitioner partitionerUsed) {
+        final FlowFileQueueContents contents = queueContents.apply("unit-test");
+        flowFilesTransferred.addAll(contents.getActiveFlowFiles());
+        swapFilesTransferred.addAll(contents.getSwapLocations());
+    }
+
+    @Override
+    public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) {
+        return rebalanceOnFailure;
+    }
+
+    public List<String> getSwapFilesTransferred() {
+        return swapFilesTransferred;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java
new file mode 100644
index 0000000..4d3609d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestContentRepositoryFlowFileAccess {
+
+    @Test
+    public void testInputStreamFromContentRepo() throws IOException {
+        final ContentRepository contentRepo = mock(ContentRepository.class);
+
+        final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "container", "section", "id", false);
+        final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 5L);
+
+        final FlowFileRecord flowFile = mock(FlowFileRecord.class);
+        when(flowFile.getContentClaim()).thenReturn(contentClaim);
+        when(flowFile.getSize()).thenReturn(5L);
+
+        final InputStream inputStream = new ByteArrayInputStream("hello".getBytes());
+        when(contentRepo.read(contentClaim)).thenReturn(inputStream);
+
+        final ContentRepositoryFlowFileAccess flowAccess = new ContentRepositoryFlowFileAccess(contentRepo);
+
+        final InputStream repoStream = flowAccess.read(flowFile);
+        verify(contentRepo, times(1)).read(contentClaim);
+
+        final byte[] buffer = new byte[5];
+        StreamUtils.fillBuffer(repoStream, buffer);
+        assertEquals(-1, repoStream.read());
+        assertArrayEquals("hello".getBytes(), buffer);
+    }
+
+
+    @Test
+    public void testContentNotFoundPropagated() throws IOException {
+        final ContentRepository contentRepo = mock(ContentRepository.class);
+
+        final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "container", "section", "id", false);
+        final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 5L);
+
+        final FlowFileRecord flowFile = mock(FlowFileRecord.class);
+        when(flowFile.getContentClaim()).thenReturn(contentClaim);
+
+        final ContentNotFoundException cnfe = new ContentNotFoundException(contentClaim);
+        when(contentRepo.read(contentClaim)).thenThrow(cnfe);
+
+        final ContentRepositoryFlowFileAccess flowAccess = new ContentRepositoryFlowFileAccess(contentRepo);
+
+        try {
+            flowAccess.read(flowFile);
+            Assert.fail("Expected ContentNotFoundException but it did not happen");
+        } catch (final ContentNotFoundException thrown) {
+            // expected
+            thrown.getFlowFile().orElseThrow(() -> new AssertionError("Expected FlowFile to be provided"));
+        }
+    }
+
+    @Test
+    public void testEOFExceptionIfNotEnoughData() throws IOException {
+        final ContentRepository contentRepo = mock(ContentRepository.class);
+
+        final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "container", "section", "id", false);
+        final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 5L);
+
+        final FlowFileRecord flowFile = mock(FlowFileRecord.class);
+        when(flowFile.getContentClaim()).thenReturn(contentClaim);
+        when(flowFile.getSize()).thenReturn(100L);
+
+        final InputStream inputStream = new ByteArrayInputStream("hello".getBytes());
+        when(contentRepo.read(contentClaim)).thenReturn(inputStream);
+
+        final ContentRepositoryFlowFileAccess flowAccess = new ContentRepositoryFlowFileAccess(contentRepo);
+
+        final InputStream repoStream = flowAccess.read(flowFile);
+        verify(contentRepo, times(1)).read(contentClaim);
+
+        final byte[] buffer = new byte[5];
+        StreamUtils.fillBuffer(repoStream, buffer);
+
+        try {
+            repoStream.read();
+            Assert.fail("Expected EOFException because not enough bytes were in the InputStream for the FlowFile");
+        } catch (final EOFException eof) {
+            // expected
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java
new file mode 100644
index 0000000..e4f0c74
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestNaiveLimitThreshold {
+
+    @Test
+    public void testCount() {
+        final SimpleLimitThreshold threshold = new SimpleLimitThreshold(10, 100L);
+        for (int i = 0; i < 9; i++) {
+            threshold.adjust(1, 1L);
+            assertFalse(threshold.isThresholdMet());
+        }
+
+        threshold.adjust(1, 1L);
+        assertTrue(threshold.isThresholdMet());
+    }
+
+    @Test
+    public void testSize() {
+        final SimpleLimitThreshold threshold = new SimpleLimitThreshold(10, 100L);
+        for (int i = 0; i < 9; i++) {
+            threshold.adjust(0, 10L);
+            assertFalse(threshold.isThresholdMet());
+        }
+
+        threshold.adjust(1, 9L);
+        assertFalse(threshold.isThresholdMet());
+
+        threshold.adjust(-1, 1L);
+        assertTrue(threshold.isThresholdMet());
+
+        threshold.adjust(0, -1L);
+        assertFalse(threshold.isThresholdMet());
+
+        threshold.adjust(-10, 10000L);
+        assertTrue(threshold.isThresholdMet());
+    }
+
+}