You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/10/06 20:53:28 UTC
nifi git commit: NIFI-2836: - Ensure that we wait until a request is
completed before unlocking the lock for request replication - Ensure that
failures do not trigger request completion logic unless the failure is the
last node to report its status - Thi
Repository: nifi
Updated Branches:
refs/heads/master bb6c5d9d4 -> 09568d092
NIFI-2836:
- Ensure that we wait until a request is completed before unlocking the lock for request replication
- Ensure that failures do not trigger request completion logic unless the failure is the last node to report its status
- This closes #1109
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/09568d09
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/09568d09
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/09568d09
Branch: refs/heads/master
Commit: 09568d092b5329e4732f2e05c10fad181c344b8d
Parents: bb6c5d9
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 5 16:19:55 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Oct 6 16:41:05 2016 -0400
----------------------------------------------------------------------
.../StandardAsyncClusterResponse.java | 14 +-
.../ThreadPoolRequestReplicator.java | 304 +++++++++++--------
.../TestThreadPoolRequestReplicator.java | 198 ++++++++++--
3 files changed, 353 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/09568d09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
index 926151e..1d4ea69 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -223,12 +223,18 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
}
}
- synchronized void setFailure(final RuntimeException failure) {
+ synchronized void setFailure(final RuntimeException failure, final NodeIdentifier nodeId) {
this.failure = failure;
- notifyAll();
- if (completionCallback != null) {
- completionCallback.onCompletion(this);
+ final int completedCount = requestsCompleted.incrementAndGet();
+ logger.debug("Notified of failure for {} from {}", id, nodeId);
+
+ if (completedCount == responseMap.size()) {
+
+ notifyAll();
+ if (completionCallback != null) {
+ completionCallback.onCompletion(this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/09568d09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 258588d..ff79000 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -249,12 +249,27 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
lock.lock();
try {
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
- return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
+
+ // Unlocking of the lock is performed within the replicate method, as we need to ensure that it is unlocked only after
+ // the entire request has completed.
+ final Object monitor = new Object();
+ synchronized (monitor) {
+ final AsyncClusterResponse response = replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, monitor);
+
+ try {
+ monitor.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return response;
+ }
} finally {
lock.unlock();
+ logger.debug("Unlocked {} after replication completed for {} {}", lock, method, uri);
}
} else {
- return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
+ return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, null);
}
}
@@ -269,7 +284,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
}
- return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false);
+ return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null);
}
/**
@@ -283,107 +298,133 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable.
* @param response the response to update with the results
* @param executionPhase <code>true</code> if this is the execution phase, <code>false</code> otherwise
+ * @param monitor a monitor that will be notified when the request completes (successfully or otherwise)
* @return an AsyncClusterResponse that can be used to obtain the result
*/
- private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
- StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) {
-
- // state validation
- Objects.requireNonNull(nodeIds);
- Objects.requireNonNull(method);
- Objects.requireNonNull(uri);
- Objects.requireNonNull(entity);
- Objects.requireNonNull(headers);
+ AsyncClusterResponse replicate(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers,
+ final boolean performVerification, StandardAsyncClusterResponse response, final boolean executionPhase, final boolean merge, final Object monitor) {
+ try {
+ // state validation
+ Objects.requireNonNull(nodeIds);
+ Objects.requireNonNull(method);
+ Objects.requireNonNull(uri);
+ Objects.requireNonNull(entity);
+ Objects.requireNonNull(headers);
+
+ if (nodeIds.isEmpty()) {
+ throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
+ }
- if (nodeIds.isEmpty()) {
- throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
- }
+ // verify all of the nodes exist and are in the proper state
+ for (final NodeIdentifier nodeId : nodeIds) {
+ final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
+ if (status == null) {
+ throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
+ }
- // verify all of the nodes exist and are in the proper state
- for (final NodeIdentifier nodeId : nodeIds) {
- final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
- if (status == null) {
- throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
+ if (status.getState() != NodeConnectionState.CONNECTED) {
+ throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
+ }
}
- if (status.getState() != NodeConnectionState.CONNECTED) {
- throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
- }
- }
+ logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
- logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
+ // Update headers to indicate the current revision so that we can
+ // prevent multiple users changing the flow at the same time
+ final Map<String, String> updatedHeaders = new HashMap<>(headers);
+ final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
- // Update headers to indicate the current revision so that we can
- // prevent multiple users changing the flow at the same time
- final Map<String, String> updatedHeaders = new HashMap<>(headers);
- final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
+ if (performVerification) {
+ verifyClusterState(method, uri.getPath());
+ }
- if (performVerification) {
- verifyClusterState(method, uri.getPath());
- }
+ int numRequests = responseMap.size();
+ if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+ numRequests = purgeExpiredRequests();
+ }
- int numRequests = responseMap.size();
- if (numRequests >= MAX_CONCURRENT_REQUESTS) {
- numRequests = purgeExpiredRequests();
- }
+ if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+ final Map<String, Long> countsByUri = responseMap.values().stream().collect(
+ Collectors.groupingBy(
+ StandardAsyncClusterResponse::getURIPath,
+ Collectors.counting()));
- if (numRequests >= MAX_CONCURRENT_REQUESTS) {
- final Map<String, Long> countsByUri = responseMap.values().stream().collect(
- Collectors.groupingBy(
- StandardAsyncClusterResponse::getURIPath,
- Collectors.counting()));
+ logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
+ throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
+ }
- logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
- throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
- }
+ // create the request objects and replicate to all nodes.
+ // When the request has completed, we need to ensure that we notify the monitor, if there is one.
+ final CompletionCallback completionCallback = clusterResponse -> {
+ try {
+ onCompletedResponse(requestId);
+ } finally {
+ if (monitor != null) {
+ synchronized (monitor) {
+ monitor.notify();
+ }
+
+ logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri);
+ }
+ }
+ };
- // create the request objects and replicate to all nodes
- final CompletionCallback completionCallback = clusterResponse -> onCompletedResponse(requestId);
- final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
+ final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
- // create a response object if one was not already passed to us
- if (response == null) {
- response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
+ // create a response object if one was not already passed to us
+ if (response == null) {
+ response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
responseMapper, completionCallback, responseConsumedCallback, merge);
- responseMap.put(requestId, response);
- }
+ responseMap.put(requestId, response);
+ }
- logger.debug("For Request ID {}, response object is {}", requestId, response);
-
- // if mutable request, we have to do a two-phase commit where we ask each node to verify
- // that the request can take place and then, if all nodes agree that it can, we can actually
- // issue the request. This is all handled by calling performVerification, which will replicate
- // the 'vote' request to all nodes and then if successful will call back into this method to
- // replicate the actual request.
- final boolean mutableRequest = isMutableRequest(method, uri.getPath());
- if (mutableRequest && performVerification) {
- logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
- performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge);
- return response;
- }
+ logger.debug("For Request ID {}, response object is {}", requestId, response);
+
+ // if mutable request, we have to do a two-phase commit where we ask each node to verify
+ // that the request can take place and then, if all nodes agree that it can, we can actually
+ // issue the request. This is all handled by calling performVerification, which will replicate
+ // the 'vote' request to all nodes and then if successful will call back into this method to
+ // replicate the actual request.
+ final boolean mutableRequest = isMutableRequest(method, uri.getPath());
+ if (mutableRequest && performVerification) {
+ logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
+ performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
+ return response;
+ }
- // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
- final StandardAsyncClusterResponse finalResponse = response;
- NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> {
- logger.debug("Received response from {} for {} {}", nodeResponse.getNodeId(), method, uri.getPath());
- finalResponse.add(nodeResponse);
- };
+ // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
+ final StandardAsyncClusterResponse finalResponse = response;
+ NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> {
+ logger.debug("Received response from {} for {} {}", nodeResponse.getNodeId(), method, uri.getPath());
+ finalResponse.add(nodeResponse);
+ };
- // instruct the node to actually perform the underlying action
- if (mutableRequest && executionPhase) {
- updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
- }
+ // instruct the node to actually perform the underlying action
+ if (mutableRequest && executionPhase) {
+ updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
+ }
- // replicate the request to all nodes
- final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
- nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
- replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
+ // replicate the request to all nodes
+ final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
+ nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
+ replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
- return response;
+ return response;
+ } catch (final Throwable t) {
+ if (monitor != null) {
+ synchronized (monitor) {
+ monitor.notify();
+ }
+ logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t);
+ }
+
+ throw t;
+ }
}
- private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse, boolean merge) {
+ private void performVerification(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers,
+ final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
final Map<String, String> validationHeaders = new HashMap<>(headers);
@@ -418,64 +459,73 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// to all nodes and we are finished.
if (dissentingCount == 0) {
logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
- replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge);
+ replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge, monitor);
return;
}
- final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
- cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
- final Thread cancelLockThread = new Thread(new Runnable() {
- @Override
- public void run() {
- logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
+ try {
+ final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
+ cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
+ final Thread cancelLockThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
- final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
- nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
+ final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
+ nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
- replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
- }
- });
- cancelLockThread.setName("Cancel Flow Locks");
- cancelLockThread.start();
-
- // Add a NodeResponse for each node to the Cluster Response
- // Check that all nodes responded successfully.
- for (final NodeResponse response : nodeResponses) {
- if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
- final ClientResponse clientResponse = response.getClientResponse();
-
- final String message;
- if (clientResponse == null) {
- message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
-
- logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur",
- response.getStatus(), response.getNodeId(), method, uri.getPath());
- } else {
- final String nodeExplanation = clientResponse.getEntity(String.class);
- message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
-
- logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}",
- response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
+ replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
}
-
- // if a node reports forbidden, use that as the response failure
- final RuntimeException failure;
- if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) {
- if (response.hasThrowable()) {
- failure = new AccessDeniedException(message, response.getThrowable());
+ });
+ cancelLockThread.setName("Cancel Flow Locks");
+ cancelLockThread.start();
+
+ // Add a NodeResponse for each node to the Cluster Response
+ // Check that all nodes responded successfully.
+ for (final NodeResponse response : nodeResponses) {
+ if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
+ final ClientResponse clientResponse = response.getClientResponse();
+
+ final String message;
+ if (clientResponse == null) {
+ message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
+
+ logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur",
+ response.getStatus(), response.getNodeId(), method, uri.getPath());
} else {
- failure = new AccessDeniedException(message);
+ final String nodeExplanation = clientResponse.getEntity(String.class);
+ message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
+
+ logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. "
+ + "The action will not occur. Node explanation: {}", response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
}
- } else {
- if (response.hasThrowable()) {
- failure = new IllegalClusterStateException(message, response.getThrowable());
+
+ // if a node reports forbidden, use that as the response failure
+ final RuntimeException failure;
+ if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) {
+ if (response.hasThrowable()) {
+ failure = new AccessDeniedException(message, response.getThrowable());
+ } else {
+ failure = new AccessDeniedException(message);
+ }
} else {
- failure = new IllegalClusterStateException(message);
+ if (response.hasThrowable()) {
+ failure = new IllegalClusterStateException(message, response.getThrowable());
+ } else {
+ failure = new IllegalClusterStateException(message);
+ }
}
+
+ clusterResponse.setFailure(failure, response.getNodeId());
+ }
+ }
+ } finally {
+ if (monitor != null) {
+ synchronized (monitor) {
+ monitor.notify();
}
- clusterResponse.setFailure(failure);
- break;
+ logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", monitor, method, uri);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/09568d09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index edbc05b..02578a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -16,13 +16,28 @@
*/
package org.apache.nifi.cluster.coordination.http.replication;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.core.header.InBoundHeaders;
-import com.sun.jersey.core.header.OutBoundHeaders;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.HttpMethod;
+
import org.apache.commons.collections4.map.MultiValueMap;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
@@ -43,25 +58,13 @@ import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import javax.ws.rs.HttpMethod;
-import java.io.ByteArrayInputStream;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.header.InBoundHeaders;
+import com.sun.jersey.core.header.OutBoundHeaders;
public class TestThreadPoolRequestReplicator {
@@ -115,10 +118,6 @@ public class TestThreadPoolRequestReplicator {
// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
- assertFalse(response.isComplete());
-
- final NodeResponse nodeResponse = response.getNodeResponse(nodeId);
- assertNull(nodeResponse);
final NodeResponse completedNodeResponse = response.awaitMergedResponse(2, TimeUnit.SECONDS);
assertNotNull(completedNodeResponse);
@@ -321,14 +320,149 @@ public class TestThreadPoolRequestReplicator {
}
}
+ @Test(timeout = 5000)
+ public void testMonitorNotifiedOnException() {
+ withReplicator(replicator -> {
+ final Object monitor = new Object();
+
+ final CountDownLatch preNotifyLatch = new CountDownLatch(1);
+ final CountDownLatch postNotifyLatch = new CountDownLatch(1);
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (monitor) {
+ while (true) {
+ // If monitor is not notified, this will block indefinitely, and the test will timeout
+ try {
+ preNotifyLatch.countDown();
+ monitor.wait();
+ break;
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+
+ postNotifyLatch.countDown();
+ }
+ }
+ }).start();
+
+ // wait for the background thread to notify that it is synchronized on monitor.
+ preNotifyLatch.await();
+
+ try {
+ // Pass in Collections.emptySet() for the node ID's so that an Exception is thrown
+ replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(),
+ Collections.emptyMap(), true, null, true, true, monitor);
+ Assert.fail("replicate did not throw IllegalArgumentException");
+ } catch (final IllegalArgumentException iae) {
+ // expected
+ }
+
+ // wait for monitor to be notified.
+ postNotifyLatch.await();
+ });
+ }
+
+ @Test(timeout = 5000)
+ public void testMonitorNotifiedOnSuccessfulCompletion() {
+ withReplicator(replicator -> {
+ final Object monitor = new Object();
+
+ final CountDownLatch preNotifyLatch = new CountDownLatch(1);
+ final CountDownLatch postNotifyLatch = new CountDownLatch(1);
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (monitor) {
+ while (true) {
+ // If monitor is not notified, this will block indefinitely, and the test will timeout
+ try {
+ preNotifyLatch.countDown();
+ monitor.wait();
+ break;
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+
+ postNotifyLatch.countDown();
+ }
+ }
+ }).start();
+
+ // wait for the background thread to notify that it is synchronized on monitor.
+ preNotifyLatch.await();
+
+ final Set<NodeIdentifier> nodeIds = new HashSet<>();
+ final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
+ nodeIds.add(nodeId);
+ final URI uri = new URI("http://localhost:8080/processors/1");
+ final Entity entity = new ProcessorEntity();
+
+ replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, null, true, true, monitor);
+
+ // wait for monitor to be notified.
+ postNotifyLatch.await();
+ });
+ }
+
+
+ @Test(timeout = 5000)
+ public void testMonitorNotifiedOnFailureResponse() {
+ withReplicator(replicator -> {
+ final Object monitor = new Object();
+
+ final CountDownLatch preNotifyLatch = new CountDownLatch(1);
+ final CountDownLatch postNotifyLatch = new CountDownLatch(1);
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (monitor) {
+ while (true) {
+ // If monitor is not notified, this will block indefinitely, and the test will timeout
+ try {
+ preNotifyLatch.countDown();
+ monitor.wait();
+ break;
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+
+ postNotifyLatch.countDown();
+ }
+ }
+ }).start();
+
+ // wait for the background thread to notify that it is synchronized on monitor.
+ preNotifyLatch.await();
+
+ final Set<NodeIdentifier> nodeIds = new HashSet<>();
+ final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
+ nodeIds.add(nodeId);
+ final URI uri = new URI("http://localhost:8080/processors/1");
+ final Entity entity = new ProcessorEntity();
+
+ replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, null, true, true, monitor);
+
+ // wait for monitor to be notified.
+ postNotifyLatch.await();
+ }, Status.INTERNAL_SERVER_ERROR, 0L, null);
+ }
+
+
private void withReplicator(final WithReplicator function) {
withReplicator(function, ClientResponse.Status.OK, 0L, null);
}
private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure) {
final ClusterCoordinator coordinator = createClusterCoordinator();
- final ThreadPoolRequestReplicator replicator
- = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+ final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null);
+ final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
@Override
protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
final URI uri, final String requestId, Map<String, String> givenHeaders) {