You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:28:59 UTC
[03/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
new file mode 100644
index 0000000..17e9237
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -0,0 +1,1345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.MockFlowFileRecord;
+import org.apache.nifi.controller.MockSwapManager;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
+import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
+import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
+import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
+import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
+import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
+import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
+import org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException;
+import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.RepositoryRecordType;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LoadBalancedQueueIT {
+ private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
+ private final LoadBalanceAuthorizer NEVER_AUTHORIZED = nodeIds -> {
+ throw new NotAuthorizedException("Intentional Unit Test Failure - Not Authorized");
+ };
+
+ private final MockSwapManager flowFileSwapManager = new MockSwapManager();
+ private final String queueId = "unit-test";
+ private final EventReporter eventReporter = EventReporter.NO_OP;
+ private final int swapThreshold = 10_000;
+
+ private Set<NodeIdentifier> nodeIdentifiers;
+ private ClusterCoordinator clusterCoordinator;
+ private NodeIdentifier localNodeId;
+ private ProcessScheduler processScheduler;
+ private ResourceClaimManager resourceClaimManager;
+ private LoadBalancedFlowFileQueue serverQueue;
+ private FlowController flowController;
+
+ private ProvenanceRepository clientProvRepo;
+ private ContentRepository clientContentRepo;
+ private List<RepositoryRecord> clientRepoRecords;
+ private FlowFileRepository clientFlowFileRepo;
+ private ConcurrentMap<ContentClaim, byte[]> clientClaimContents;
+
+ private ProvenanceRepository serverProvRepo;
+ private List<RepositoryRecord> serverRepoRecords;
+ private FlowFileRepository serverFlowFileRepo;
+ private ConcurrentMap<ContentClaim, byte[]> serverClaimContents;
+ private ContentRepository serverContentRepo;
+
+ private SSLContext sslContext;
+
+ private final Set<ClusterTopologyEventListener> clusterEventListeners = Collections.synchronizedSet(new HashSet<>());
+ private final AtomicReference<LoadBalanceCompression> compressionReference = new AtomicReference<>();
+
+ @Before
+ public void setup() throws IOException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+ compressionReference.set(LoadBalanceCompression.DO_NOT_COMPRESS);
+
+ nodeIdentifiers = new HashSet<>();
+
+ clusterCoordinator = mock(ClusterCoordinator.class);
+ when(clusterCoordinator.getNodeIdentifiers()).thenAnswer(invocation -> new HashSet<>(nodeIdentifiers));
+ when(clusterCoordinator.getLocalNodeIdentifier()).thenAnswer(invocation -> localNodeId);
+
+ clusterEventListeners.clear();
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) {
+ clusterEventListeners.add(invocation.getArgumentAt(0, ClusterTopologyEventListener.class));
+ return null;
+ }
+ }).when(clusterCoordinator).registerEventListener(any(ClusterTopologyEventListener.class));
+
+ processScheduler = mock(ProcessScheduler.class);
+ clientProvRepo = mock(ProvenanceRepository.class);
+ resourceClaimManager = new StandardResourceClaimManager();
+ final Connection connection = mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn(queueId);
+
+ serverQueue = mock(LoadBalancedFlowFileQueue.class);
+ when(serverQueue.isFull()).thenReturn(false);
+ when(connection.getFlowFileQueue()).thenReturn(serverQueue);
+ doAnswer(invocation -> compressionReference.get()).when(serverQueue).getLoadBalanceCompression();
+
+ flowController = mock(FlowController.class);
+ when(flowController.getConnection(anyString())).thenReturn(connection);
+
+ // Create repos for the server
+ serverRepoRecords = Collections.synchronizedList(new ArrayList<>());
+ serverFlowFileRepo = createFlowFileRepository(serverRepoRecords);
+
+ serverClaimContents = new ConcurrentHashMap<>();
+ serverContentRepo = createContentRepository(serverClaimContents);
+ serverProvRepo = mock(ProvenanceRepository.class);
+
+ clientClaimContents = new ConcurrentHashMap<>();
+ clientContentRepo = createContentRepository(clientClaimContents);
+ clientRepoRecords = Collections.synchronizedList(new ArrayList<>());
+ clientFlowFileRepo = createFlowFileRepository(clientRepoRecords);
+
+ final String keystore = "src/test/resources/localhost-ks.jks";
+ final String keystorePass = "OI7kMpWzzVNVx/JGhTL/0uO4+PWpGJ46uZ/pfepbkwI";
+ final String keyPass = keystorePass;
+ final String truststore = "src/test/resources/localhost-ts.jks";
+ final String truststorePass = "wAOR0nQJ2EXvOP0JZ2EaqA/n7W69ILS4sWAHghmIWCc";
+ sslContext = SslContextFactory.createSslContext(keystore, keystorePass.toCharArray(), keyPass.toCharArray(), "JKS",
+ truststore, truststorePass.toCharArray(), "JKS",
+ SslContextFactory.ClientAuth.REQUIRED, "TLS");
+ }
+
+
+ private ContentClaim createContentClaim(final byte[] bytes) {
+ final ResourceClaim resourceClaim = mock(ResourceClaim.class);
+ when(resourceClaim.getContainer()).thenReturn("container");
+ when(resourceClaim.getSection()).thenReturn("section");
+ when(resourceClaim.getId()).thenReturn("identifier");
+
+ final ContentClaim contentClaim = mock(ContentClaim.class);
+ when(contentClaim.getResourceClaim()).thenReturn(resourceClaim);
+
+ if (bytes != null) {
+ clientClaimContents.put(contentClaim, bytes);
+ }
+
+ return contentClaim;
+ }
+
+
+ private NioAsyncLoadBalanceClientFactory createClientFactory(final SSLContext sslContext) {
+ final FlowFileContentAccess flowFileContentAccess = flowFile -> clientContentRepo.read(flowFile.getContentClaim());
+ return new NioAsyncLoadBalanceClientFactory(sslContext, 30000, flowFileContentAccess, eventReporter, new StandardLoadBalanceFlowFileCodec());
+ }
+
+ @Test(timeout = 20_000)
+ public void testNewNodeAdded() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 1000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+ final Thread clientThread = new Thread(clientTask);
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ final int serverCount = 5;
+ final ConnectionLoadBalanceServer[] servers = new ConnectionLoadBalanceServer[serverCount];
+
+ try {
+ flowFileQueue.startLoadBalancing();
+ clientThread.start();
+
+ for (int i = 0; i < serverCount; i++) {
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 8, loadBalanceProtocol, eventReporter, timeoutMillis);
+ servers[i] = server;
+ server.start();
+
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier nodeId = new NodeIdentifier("unit-test-" + i, "localhost", 8090 + i, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(nodeId);
+
+
+ clusterEventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
+
+ for (int j=0; j < 2; j++) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("greeting", "hello");
+
+ final MockFlowFileRecord flowFile = new MockFlowFileRecord(attributes, 0L);
+ flowFileQueue.put(flowFile);
+ }
+ }
+
+ final int totalFlowFileCount = 6;
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.size() < totalFlowFileCount && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never fully updated", serverRepoRecords.isEmpty());
+
+ assertEquals(totalFlowFileCount, serverRepoRecords.size());
+
+ for (final RepositoryRecord serverRecord : serverRepoRecords) {
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("hello", serverFlowFile.getAttribute("greeting"));
+ }
+
+ while (clientRepoRecords.size() < totalFlowFileCount) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(totalFlowFileCount, clientRepoRecords.size());
+
+ for (final RepositoryRecord clientRecord : clientRepoRecords) {
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ }
+ } finally {
+ clientTask.stop();
+
+ flowFileQueue.stopLoadBalancing();
+
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ Arrays.stream(servers).filter(Objects::nonNull).forEach(ConnectionLoadBalanceServer::stop);
+ }
+ }
+
+ @Test(timeout = 60_000)
+ public void testFailover() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 1000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier availableNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(availableNodeId);
+
+ // Add a Node Identifier pointing to a non-existent server
+ final NodeIdentifier inaccessibleNodeId = new NodeIdentifier("unit-test-invalid-host-does-not-exist", "invalid-host-does-not-exist", 8090, "invalid-host-does-not-exist", 8090,
+ "invalid-host-does-not-exist", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(inaccessibleNodeId);
+
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ final int numFlowFiles = 1200;
+ for (int i = 0; i < numFlowFiles; i++) {
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("uuid", UUID.randomUUID().toString());
+ attributes.put("greeting", "hello");
+
+ final MockFlowFileRecord flowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(flowFile);
+ }
+
+ flowFileQueue.startLoadBalancing();
+
+ clientThread.start();
+
+ // Sending to one partition should fail. When that happens, half of the FlowFiles should go to the local partition,
+ // the other half to the other node. So the total number of FlowFiles expected is ((numFlowFiles per node) / 3 * 1.5)
+ final int flowFilesPerNode = numFlowFiles / 3;
+ final int expectedFlowFileReceiveCount = flowFilesPerNode + flowFilesPerNode / 2;
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
+ while (serverRepoRecords.size() < expectedFlowFileReceiveCount && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never fully updated", serverRepoRecords.isEmpty());
+
+ assertEquals(expectedFlowFileReceiveCount, serverRepoRecords.size());
+
+ for (final RepositoryRecord serverRecord : serverRepoRecords) {
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("hello", serverFlowFile.getAttribute("greeting"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), Arrays.copyOfRange(serverFlowFileContent, serverFlowFileContent.length - 5, serverFlowFileContent.length));
+ }
+
+ // We expect the client records to be numFlowFiles / 2 because half of the FlowFile will have gone to the other node
+ // in the cluster and half would still be in the local partition.
+ while (clientRepoRecords.size() < numFlowFiles / 2) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(numFlowFiles / 2, clientRepoRecords.size());
+
+ for (final RepositoryRecord clientRecord : clientRepoRecords) {
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ }
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 20_000)
+ public void testTransferToRemoteNode() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+ assertEquals(1, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 20_000)
+ public void testContentNotFound() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ this.clientClaimContents.remove(contentClaim); // cause ContentNotFoundException
+
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.CONTENTMISSING, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 20_000)
+ public void testTransferToRemoteNodeAttributeCompression() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+ compressionReference.set(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+ flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+ assertEquals(1, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 20_000)
+ public void testTransferToRemoteNodeContentCompression() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+ compressionReference.set(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+ flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+ assertEquals(1, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+ @Test(timeout = 20_000)
+ public void testWithSSLContext() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+ assertEquals(1, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 60_000)
+ public void testReusingClient() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ for (int i = 1; i <= 10; i++) {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.size() < i && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(i, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+ while (clientRepoRecords.size() < i) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(i, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ }
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 20_000)
+ public void testLargePayload() throws IOException, InterruptedException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ final byte[] payload = new byte[1024 * 1024];
+ Arrays.fill(payload, (byte) 'A');
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim(payload);
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, payload.length, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+ assertEquals(1, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals(payload, serverFlowFileContent);
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 60_000)
+ public void testServerClosesUnexpectedly() throws IOException, InterruptedException {
+
+ doAnswer(new Answer<OutputStream>() {
+ int iterations = 0;
+
+ @Override
+ public OutputStream answer(final InvocationOnMock invocation) {
+ if (iterations++ < 5) {
+ return new OutputStream() {
+ @Override
+ public void write(final int b) throws IOException {
+ throw new IOException("Intentional unit test failure");
+ }
+ };
+ }
+
+ final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ serverClaimContents.put(contentClaim, toByteArray());
+ }
+ };
+
+ return baos;
+ }
+ }).when(serverContentRepo).write(any(ContentClaim.class));
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+ final SSLContext sslContext = null;
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new FlowFilePartitioner() {
+ @Override
+ public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) {
+ for (final QueuePartition partition : partitions) {
+ if (partition != localPartition) {
+ return partition;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean isRebalanceOnClusterResize() {
+ return true;
+ }
+ @Override
+ public boolean isRebalanceOnFailure() {
+ return true;
+ }
+ });
+
+ try {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ // Wait up to 10 seconds for the server's FlowFile Repository to be updated
+ final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
+ while (serverRepoRecords.isEmpty() && System.currentTimeMillis() < endTime) {
+ Thread.sleep(10L);
+ }
+
+ assertFalse("Server's FlowFile Repo was never updated", serverRepoRecords.isEmpty());
+
+ assertEquals(1, serverRepoRecords.size());
+
+ final RepositoryRecord serverRecord = serverRepoRecords.iterator().next();
+ final FlowFileRecord serverFlowFile = serverRecord.getCurrent();
+ assertEquals("test", serverFlowFile.getAttribute("integration"));
+ assertEquals("false", serverFlowFile.getAttribute("unit-test"));
+ assertEquals("true", serverFlowFile.getAttribute("integration-test"));
+
+ final ContentClaim serverContentClaim = serverFlowFile.getContentClaim();
+ final byte[] serverFlowFileContent = serverClaimContents.get(serverContentClaim);
+ assertArrayEquals("hello".getBytes(), serverFlowFileContent);
+
+ while (clientRepoRecords.size() == 0) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, clientRepoRecords.size());
+ final RepositoryRecord clientRecord = clientRepoRecords.iterator().next();
+ assertEquals(RepositoryRecordType.DELETE, clientRecord.getType());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 20_000)
+ public void testNotAuthorized() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, NEVER_AUTHORIZED);
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ Thread.sleep(5000L);
+
+ assertTrue("Server's FlowFile Repo was updated", serverRepoRecords.isEmpty());
+ assertTrue(clientRepoRecords.isEmpty());
+
+ assertEquals(2, flowFileQueue.size().getObjectCount());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ @Test(timeout = 35_000)
+ public void testDestinationNodeQueueFull() throws IOException, InterruptedException {
+ localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
+ nodeIdentifiers.add(localNodeId);
+
+ when(serverQueue.isFull()).thenReturn(true);
+
+ // Create the server
+ final int timeoutMillis = 30000;
+ final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(serverFlowFileRepo, serverContentRepo, serverProvRepo, flowController, ALWAYS_AUTHORIZED);
+
+ final ConnectionLoadBalanceServer server = new ConnectionLoadBalanceServer("localhost", 0, sslContext, 2, loadBalanceProtocol, eventReporter, timeoutMillis);
+ server.start();
+
+ try {
+ final int loadBalancePort = server.getPort();
+
+ // Create the Load Balanced FlowFile Queue
+ final NodeIdentifier remoteNodeId = new NodeIdentifier("unit-test", "localhost", 8090, "localhost", 8090, "localhost", loadBalancePort, null, null, null, false, null);
+ nodeIdentifiers.add(remoteNodeId);
+
+ final NioAsyncLoadBalanceClientRegistry clientRegistry = new NioAsyncLoadBalanceClientRegistry(createClientFactory(sslContext), 1);
+ clientRegistry.start();
+
+ final NodeConnectionStatus connectionStatus = mock(NodeConnectionStatus.class);
+ when(connectionStatus.getState()).thenReturn(NodeConnectionState.CONNECTED);
+ when(clusterCoordinator.getConnectionStatus(any(NodeIdentifier.class))).thenReturn(connectionStatus);
+ final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
+
+ final Thread clientThread = new Thread(clientTask);
+ clientThread.setDaemon(true);
+ clientThread.start();
+
+ final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
+ clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
+ flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+ try {
+ final MockFlowFileRecord firstFlowFile = new MockFlowFileRecord(0L);
+ flowFileQueue.put(firstFlowFile);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("integration", "test");
+ attributes.put("unit-test", "false");
+ attributes.put("integration-test", "true");
+
+ final ContentClaim contentClaim = createContentClaim("hello".getBytes());
+ final MockFlowFileRecord secondFlowFile = new MockFlowFileRecord(attributes, 5L, contentClaim);
+ flowFileQueue.put(secondFlowFile);
+
+ flowFileQueue.startLoadBalancing();
+
+ Thread.sleep(5000L);
+
+ assertTrue("Server's FlowFile Repo was updated", serverRepoRecords.isEmpty());
+ assertTrue(clientRepoRecords.isEmpty());
+
+ assertEquals(2, flowFileQueue.size().getObjectCount());
+
+ // Enable data to be transferred
+ when(serverQueue.isFull()).thenReturn(false);
+
+ while (clientRepoRecords.size() != 1) {
+ Thread.sleep(10L);
+ }
+
+ assertEquals(1, serverRepoRecords.size());
+ } finally {
+ flowFileQueue.stopLoadBalancing();
+ clientRegistry.getAllClients().forEach(AsyncLoadBalanceClient::stop);
+ }
+ } finally {
+ server.stop();
+ }
+ }
+
+ private FlowFileRepository createFlowFileRepository(final List<RepositoryRecord> repoRecords) throws IOException {
+ final FlowFileRepository flowFileRepo = mock(FlowFileRepository.class);
+ doAnswer(invocation -> {
+ final Collection records = invocation.getArgumentAt(0, Collection.class);
+ repoRecords.addAll(records);
+ return null;
+ }).when(flowFileRepo).updateRepository(anyCollection());
+
+ return flowFileRepo;
+ }
+
+
+ private ContentRepository createContentRepository(final ConcurrentMap<ContentClaim, byte[]> claimContents) throws IOException {
+ final ContentRepository contentRepo = mock(ContentRepository.class);
+
+ Mockito.doAnswer(new Answer<ContentClaim>() {
+ @Override
+ public ContentClaim answer(final InvocationOnMock invocation) {
+ return createContentClaim(null);
+ }
+ }).when(contentRepo).create(Mockito.anyBoolean());
+
+
+ Mockito.doAnswer(new Answer<OutputStream>() {
+ @Override
+ public OutputStream answer(final InvocationOnMock invocation) {
+ final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ claimContents.put(contentClaim, toByteArray());
+ }
+ };
+
+ return baos;
+ }
+ }).when(contentRepo).write(any(ContentClaim.class));
+
+
+ Mockito.doAnswer(new Answer<InputStream>() {
+ @Override
+ public InputStream answer(final InvocationOnMock invocation) {
+ final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class);
+ if (contentClaim == null) {
+ return new ByteArrayInputStream(new byte[0]);
+ }
+
+ final byte[] bytes = claimContents.get(contentClaim);
+ if (bytes == null) {
+ throw new ContentNotFoundException(contentClaim);
+ }
+
+ return new ByteArrayInputStream(bytes);
+ }
+ }).when(contentRepo).read(any(ContentClaim.class));
+
+ return contentRepo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java
new file mode 100644
index 0000000..dc5c1db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/MockTransferFailureDestination.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+public class MockTransferFailureDestination implements TransferFailureDestination {
+ private List<FlowFileRecord> flowFilesTransferred = new ArrayList<>();
+ private List<String> swapFilesTransferred = new ArrayList<>();
+ private final boolean rebalanceOnFailure;
+
+ public MockTransferFailureDestination(final boolean rebalanceOnFailure) {
+ this.rebalanceOnFailure = rebalanceOnFailure;
+ }
+
+ @Override
+ public void putAll(final Collection<FlowFileRecord> flowFiles, final FlowFilePartitioner partitionerUsed) {
+ flowFilesTransferred.addAll(flowFiles);
+ }
+
+ public List<FlowFileRecord> getFlowFilesTransferred() {
+ return flowFilesTransferred;
+ }
+
+ @Override
+ public void putAll(final Function<String, FlowFileQueueContents> queueContents, final FlowFilePartitioner partitionerUsed) {
+ final FlowFileQueueContents contents = queueContents.apply("unit-test");
+ flowFilesTransferred.addAll(contents.getActiveFlowFiles());
+ swapFilesTransferred.addAll(contents.getSwapLocations());
+ }
+
+ @Override
+ public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) {
+ return rebalanceOnFailure;
+ }
+
+ public List<String> getSwapFilesTransferred() {
+ return swapFilesTransferred;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java
new file mode 100644
index 0000000..4d3609d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestContentRepositoryFlowFileAccess.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestContentRepositoryFlowFileAccess {
+
+ @Test
+ public void testInputStreamFromContentRepo() throws IOException {
+ final ContentRepository contentRepo = mock(ContentRepository.class);
+
+ final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+ final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "container", "section", "id", false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 5L);
+
+ final FlowFileRecord flowFile = mock(FlowFileRecord.class);
+ when(flowFile.getContentClaim()).thenReturn(contentClaim);
+ when(flowFile.getSize()).thenReturn(5L);
+
+ final InputStream inputStream = new ByteArrayInputStream("hello".getBytes());
+ when(contentRepo.read(contentClaim)).thenReturn(inputStream);
+
+ final ContentRepositoryFlowFileAccess flowAccess = new ContentRepositoryFlowFileAccess(contentRepo);
+
+ final InputStream repoStream = flowAccess.read(flowFile);
+ verify(contentRepo, times(1)).read(contentClaim);
+
+ final byte[] buffer = new byte[5];
+ StreamUtils.fillBuffer(repoStream, buffer);
+ assertEquals(-1, repoStream.read());
+ assertArrayEquals("hello".getBytes(), buffer);
+ }
+
+
+ @Test
+ public void testContentNotFoundPropagated() throws IOException {
+ final ContentRepository contentRepo = mock(ContentRepository.class);
+
+ final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+ final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "container", "section", "id", false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 5L);
+
+ final FlowFileRecord flowFile = mock(FlowFileRecord.class);
+ when(flowFile.getContentClaim()).thenReturn(contentClaim);
+
+ final ContentNotFoundException cnfe = new ContentNotFoundException(contentClaim);
+ when(contentRepo.read(contentClaim)).thenThrow(cnfe);
+
+ final ContentRepositoryFlowFileAccess flowAccess = new ContentRepositoryFlowFileAccess(contentRepo);
+
+ try {
+ flowAccess.read(flowFile);
+ Assert.fail("Expected ContentNotFoundException but it did not happen");
+ } catch (final ContentNotFoundException thrown) {
+ // expected
+ thrown.getFlowFile().orElseThrow(() -> new AssertionError("Expected FlowFile to be provided"));
+ }
+ }
+
+ @Test
+ public void testEOFExceptionIfNotEnoughData() throws IOException {
+ final ContentRepository contentRepo = mock(ContentRepository.class);
+
+ final ResourceClaimManager claimManager = new StandardResourceClaimManager();
+ final ResourceClaim resourceClaim = new StandardResourceClaim(claimManager, "container", "section", "id", false);
+ final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 5L);
+
+ final FlowFileRecord flowFile = mock(FlowFileRecord.class);
+ when(flowFile.getContentClaim()).thenReturn(contentClaim);
+ when(flowFile.getSize()).thenReturn(100L);
+
+ final InputStream inputStream = new ByteArrayInputStream("hello".getBytes());
+ when(contentRepo.read(contentClaim)).thenReturn(inputStream);
+
+ final ContentRepositoryFlowFileAccess flowAccess = new ContentRepositoryFlowFileAccess(contentRepo);
+
+ final InputStream repoStream = flowAccess.read(flowFile);
+ verify(contentRepo, times(1)).read(contentClaim);
+
+ final byte[] buffer = new byte[5];
+ StreamUtils.fillBuffer(repoStream, buffer);
+
+ try {
+ repoStream.read();
+ Assert.fail("Expected EOFException because not enough bytes were in the InputStream for the FlowFile");
+ } catch (final EOFException eof) {
+ // expected
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java
new file mode 100644
index 0000000..e4f0c74
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestNaiveLimitThreshold.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestNaiveLimitThreshold {
+
+ @Test
+ public void testCount() {
+ final SimpleLimitThreshold threshold = new SimpleLimitThreshold(10, 100L);
+ for (int i = 0; i < 9; i++) {
+ threshold.adjust(1, 1L);
+ assertFalse(threshold.isThresholdMet());
+ }
+
+ threshold.adjust(1, 1L);
+ assertTrue(threshold.isThresholdMet());
+ }
+
+ @Test
+ public void testSize() {
+ final SimpleLimitThreshold threshold = new SimpleLimitThreshold(10, 100L);
+ for (int i = 0; i < 9; i++) {
+ threshold.adjust(0, 10L);
+ assertFalse(threshold.isThresholdMet());
+ }
+
+ threshold.adjust(1, 9L);
+ assertFalse(threshold.isThresholdMet());
+
+ threshold.adjust(-1, 1L);
+ assertTrue(threshold.isThresholdMet());
+
+ threshold.adjust(0, -1L);
+ assertFalse(threshold.isThresholdMet());
+
+ threshold.adjust(-10, 10000L);
+ assertTrue(threshold.isThresholdMet());
+ }
+
+}