You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/10/06 20:53:28 UTC

nifi git commit: NIFI-2836: - Ensure that we wait until a request is completed before unlocking the lock for request replication - Ensure that failures do not trigger request completion logic unless the failure is the last node to report its status - Thi

Repository: nifi
Updated Branches:
  refs/heads/master bb6c5d9d4 -> 09568d092


NIFI-2836:
- Ensure that we wait until a request is completed before unlocking the lock for request replication
- Ensure that failures do not trigger request completion logic unless the failure is the last node to report its status
- This closes #1109


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/09568d09
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/09568d09
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/09568d09

Branch: refs/heads/master
Commit: 09568d092b5329e4732f2e05c10fad181c344b8d
Parents: bb6c5d9
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 5 16:19:55 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Oct 6 16:41:05 2016 -0400

----------------------------------------------------------------------
 .../StandardAsyncClusterResponse.java           |  14 +-
 .../ThreadPoolRequestReplicator.java            | 304 +++++++++++--------
 .../TestThreadPoolRequestReplicator.java        | 198 ++++++++++--
 3 files changed, 353 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/09568d09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
index 926151e..1d4ea69 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -223,12 +223,18 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
         }
     }
 
-    synchronized void setFailure(final RuntimeException failure) {
+    synchronized void setFailure(final RuntimeException failure, final NodeIdentifier nodeId) {
         this.failure = failure;
 
-        notifyAll();
-        if (completionCallback != null) {
-            completionCallback.onCompletion(this);
+        final int completedCount = requestsCompleted.incrementAndGet();
+        logger.debug("Notified of failure for {} from {}", id, nodeId);
+
+        if (completedCount == responseMap.size()) {
+
+            notifyAll();
+            if (completionCallback != null) {
+                completionCallback.onCompletion(this);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/09568d09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 258588d..ff79000 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -249,12 +249,27 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             lock.lock();
             try {
                 logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
-                return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
+
+                // Unlocking of the lock is performed within the replicate method, as we need to ensure that it is unlocked only after
+                // the entire request has completed.
+                final Object monitor = new Object();
+                synchronized (monitor) {
+                    final AsyncClusterResponse response = replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, monitor);
+
+                    try {
+                        monitor.wait();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+
+                    return response;
+                }
             } finally {
                 lock.unlock();
+                logger.debug("Unlocked {} after replication completed for {} {}", lock, method, uri);
             }
         } else {
-            return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
+            return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true, null);
         }
     }
 
@@ -269,7 +284,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
         }
 
-        return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false);
+        return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null);
     }
 
     /**
@@ -283,107 +298,133 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
      * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable.
      * @param response            the response to update with the results
      * @param executionPhase      <code>true</code> if this is the execution phase, <code>false</code> otherwise
+     * @param monitor             a monitor that will be notified when the request completes (successfully or otherwise)
      * @return an AsyncClusterResponse that can be used to obtain the result
      */
-    private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
-                                           StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) {
-
-        // state validation
-        Objects.requireNonNull(nodeIds);
-        Objects.requireNonNull(method);
-        Objects.requireNonNull(uri);
-        Objects.requireNonNull(entity);
-        Objects.requireNonNull(headers);
+    AsyncClusterResponse replicate(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers,
+        final boolean performVerification, StandardAsyncClusterResponse response, final boolean executionPhase, final boolean merge, final Object monitor) {
+        try {
+            // state validation
+            Objects.requireNonNull(nodeIds);
+            Objects.requireNonNull(method);
+            Objects.requireNonNull(uri);
+            Objects.requireNonNull(entity);
+            Objects.requireNonNull(headers);
+
+            if (nodeIds.isEmpty()) {
+                throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
+            }
 
-        if (nodeIds.isEmpty()) {
-            throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
-        }
+            // verify all of the nodes exist and are in the proper state
+            for (final NodeIdentifier nodeId : nodeIds) {
+                final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
+                if (status == null) {
+                    throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
+                }
 
-        // verify all of the nodes exist and are in the proper state
-        for (final NodeIdentifier nodeId : nodeIds) {
-            final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
-            if (status == null) {
-                throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster");
+                if (status.getState() != NodeConnectionState.CONNECTED) {
+                    throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
+                }
             }
 
-            if (status.getState() != NodeConnectionState.CONNECTED) {
-                throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected");
-            }
-        }
+            logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
 
-        logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
+            // Update headers to indicate the current revision so that we can
+            // prevent multiple users changing the flow at the same time
+            final Map<String, String> updatedHeaders = new HashMap<>(headers);
+            final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
 
-        // Update headers to indicate the current revision so that we can
-        // prevent multiple users changing the flow at the same time
-        final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
+            if (performVerification) {
+                verifyClusterState(method, uri.getPath());
+            }
 
-        if (performVerification) {
-            verifyClusterState(method, uri.getPath());
-        }
+            int numRequests = responseMap.size();
+            if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+                numRequests = purgeExpiredRequests();
+            }
 
-        int numRequests = responseMap.size();
-        if (numRequests >= MAX_CONCURRENT_REQUESTS) {
-            numRequests = purgeExpiredRequests();
-        }
+            if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+                final Map<String, Long> countsByUri = responseMap.values().stream().collect(
+                        Collectors.groupingBy(
+                                StandardAsyncClusterResponse::getURIPath,
+                                Collectors.counting()));
 
-        if (numRequests >= MAX_CONCURRENT_REQUESTS) {
-            final Map<String, Long> countsByUri = responseMap.values().stream().collect(
-                    Collectors.groupingBy(
-                            StandardAsyncClusterResponse::getURIPath,
-                            Collectors.counting()));
+                logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
+                throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
+            }
 
-            logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri);
-            throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
-        }
+            // create the request objects and replicate to all nodes.
+            // When the request has completed, we need to ensure that we notify the monitor, if there is one.
+            final CompletionCallback completionCallback = clusterResponse -> {
+                try {
+                    onCompletedResponse(requestId);
+                } finally {
+                    if (monitor != null) {
+                        synchronized (monitor) {
+                            monitor.notify();
+                        }
+
+                        logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri);
+                    }
+                }
+            };
 
-        // create the request objects and replicate to all nodes
-        final CompletionCallback completionCallback = clusterResponse -> onCompletedResponse(requestId);
-        final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
+            final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
 
-        // create a response object if one was not already passed to us
-        if (response == null) {
-            response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
+            // create a response object if one was not already passed to us
+            if (response == null) {
+                response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
                     responseMapper, completionCallback, responseConsumedCallback, merge);
-            responseMap.put(requestId, response);
-        }
+                responseMap.put(requestId, response);
+            }
 
-        logger.debug("For Request ID {}, response object is {}", requestId, response);
-
-        // if mutable request, we have to do a two-phase commit where we ask each node to verify
-        // that the request can take place and then, if all nodes agree that it can, we can actually
-        // issue the request. This is all handled by calling performVerification, which will replicate
-        // the 'vote' request to all nodes and then if successful will call back into this method to
-        // replicate the actual request.
-        final boolean mutableRequest = isMutableRequest(method, uri.getPath());
-        if (mutableRequest && performVerification) {
-            logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
-            performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge);
-            return response;
-        }
+            logger.debug("For Request ID {}, response object is {}", requestId, response);
+
+            // if mutable request, we have to do a two-phase commit where we ask each node to verify
+            // that the request can take place and then, if all nodes agree that it can, we can actually
+            // issue the request. This is all handled by calling performVerification, which will replicate
+            // the 'vote' request to all nodes and then if successful will call back into this method to
+            // replicate the actual request.
+            final boolean mutableRequest = isMutableRequest(method, uri.getPath());
+            if (mutableRequest && performVerification) {
+                logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
+                performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
+                return response;
+            }
 
-        // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
-        final StandardAsyncClusterResponse finalResponse = response;
-        NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> {
-            logger.debug("Received response from {} for {} {}", nodeResponse.getNodeId(), method, uri.getPath());
-            finalResponse.add(nodeResponse);
-        };
+            // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work
+            final StandardAsyncClusterResponse finalResponse = response;
+            NodeRequestCompletionCallback nodeCompletionCallback = nodeResponse -> {
+                logger.debug("Received response from {} for {} {}", nodeResponse.getNodeId(), method, uri.getPath());
+                finalResponse.add(nodeResponse);
+            };
 
-        // instruct the node to actually perform the underlying action
-        if (mutableRequest && executionPhase) {
-            updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
-        }
+            // instruct the node to actually perform the underlying action
+            if (mutableRequest && executionPhase) {
+                updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
+            }
 
-        // replicate the request to all nodes
-        final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
-                nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
-        replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
+            // replicate the request to all nodes
+            final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
+                    nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback);
+            replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders);
 
-        return response;
+            return response;
+        } catch (final Throwable t) {
+            if (monitor != null) {
+                synchronized (monitor) {
+                    monitor.notify();
+                }
+                logger.debug("Notified monitor {} because request {} {} has failed with Throwable {}", monitor, method, uri, t);
+            }
+
+            throw t;
+        }
     }
 
 
-    private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse, boolean merge) {
+    private void performVerification(final Set<NodeIdentifier> nodeIds, final String method, final URI uri, final Object entity, final Map<String, String> headers,
+        final StandardAsyncClusterResponse clusterResponse, final boolean merge, final Object monitor) {
         logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
 
         final Map<String, String> validationHeaders = new HashMap<>(headers);
@@ -418,64 +459,73 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                         // to all nodes and we are finished.
                         if (dissentingCount == 0) {
                             logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
-                            replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge);
+                            replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge, monitor);
                             return;
                         }
 
-                        final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
-                        cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
-                        final Thread cancelLockThread = new Thread(new Runnable() {
-                            @Override
-                            public void run() {
-                                logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
+                        try {
+                            final Map<String, String> cancelLockHeaders = new HashMap<>(headers);
+                            cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
+                            final Thread cancelLockThread = new Thread(new Runnable() {
+                                @Override
+                                public void run() {
+                                    logger.debug("Found {} dissenting nodes for {} {}; canceling claim request", dissentingCount, method, uri.getPath());
 
-                                final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
-                                        nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
+                                    final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
+                                            nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, cancelLockHeaders, null);
 
-                                replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
-                            }
-                        });
-                        cancelLockThread.setName("Cancel Flow Locks");
-                        cancelLockThread.start();
-
-                        // Add a NodeResponse for each node to the Cluster Response
-                        // Check that all nodes responded successfully.
-                        for (final NodeResponse response : nodeResponses) {
-                            if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
-                                final ClientResponse clientResponse = response.getClientResponse();
-
-                                final String message;
-                                if (clientResponse == null) {
-                                    message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
-
-                                    logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur",
-                                            response.getStatus(), response.getNodeId(), method, uri.getPath());
-                                } else {
-                                    final String nodeExplanation = clientResponse.getEntity(String.class);
-                                    message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
-
-                                    logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}",
-                                            response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
+                                    replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, cancelLockHeaders);
                                 }
-
-                                // if a node reports forbidden, use that as the response failure
-                                final RuntimeException failure;
-                                if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) {
-                                    if (response.hasThrowable()) {
-                                        failure = new AccessDeniedException(message, response.getThrowable());
+                            });
+                            cancelLockThread.setName("Cancel Flow Locks");
+                            cancelLockThread.start();
+
+                            // Add a NodeResponse for each node to the Cluster Response
+                            // Check that all nodes responded successfully.
+                            for (final NodeResponse response : nodeResponses) {
+                                if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
+                                    final ClientResponse clientResponse = response.getClientResponse();
+
+                                    final String message;
+                                    if (clientResponse == null) {
+                                        message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
+
+                                        logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur",
+                                                response.getStatus(), response.getNodeId(), method, uri.getPath());
                                     } else {
-                                        failure = new AccessDeniedException(message);
+                                        final String nodeExplanation = clientResponse.getEntity(String.class);
+                                        message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
+
+                                        logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. "
+                                            + "The action will not occur. Node explanation: {}", response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
                                     }
-                                } else {
-                                    if (response.hasThrowable()) {
-                                        failure = new IllegalClusterStateException(message, response.getThrowable());
+
+                                    // if a node reports forbidden, use that as the response failure
+                                    final RuntimeException failure;
+                                    if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) {
+                                        if (response.hasThrowable()) {
+                                            failure = new AccessDeniedException(message, response.getThrowable());
+                                        } else {
+                                            failure = new AccessDeniedException(message);
+                                        }
                                     } else {
-                                        failure = new IllegalClusterStateException(message);
+                                        if (response.hasThrowable()) {
+                                            failure = new IllegalClusterStateException(message, response.getThrowable());
+                                        } else {
+                                            failure = new IllegalClusterStateException(message);
+                                        }
                                     }
+
+                                    clusterResponse.setFailure(failure, response.getNodeId());
+                                }
+                            }
+                        } finally {
+                            if (monitor != null) {
+                                synchronized (monitor) {
+                                    monitor.notify();
                                 }
 
-                                clusterResponse.setFailure(failure);
-                                break;
+                                logger.debug("Notified monitor {} because request {} {} has failed due to at least 1 dissenting node", monitor, method, uri);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/09568d09/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index edbc05b..02578a5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -16,13 +16,28 @@
  */
 package org.apache.nifi.cluster.coordination.http.replication;
 
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.core.header.InBoundHeaders;
-import com.sun.jersey.core.header.OutBoundHeaders;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.ws.rs.HttpMethod;
+
 import org.apache.commons.collections4.map.MultiValueMap;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
@@ -43,25 +58,13 @@ import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import javax.ws.rs.HttpMethod;
-import java.io.ByteArrayInputStream;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.header.InBoundHeaders;
+import com.sun.jersey.core.header.OutBoundHeaders;
 
 public class TestThreadPoolRequestReplicator {
 
@@ -115,10 +118,6 @@ public class TestThreadPoolRequestReplicator {
 
             // We should get back the same response object
             assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
-            assertFalse(response.isComplete());
-
-            final NodeResponse nodeResponse = response.getNodeResponse(nodeId);
-            assertNull(nodeResponse);
 
             final NodeResponse completedNodeResponse = response.awaitMergedResponse(2, TimeUnit.SECONDS);
             assertNotNull(completedNodeResponse);
@@ -321,14 +320,149 @@ public class TestThreadPoolRequestReplicator {
         }
     }
 
+    @Test(timeout = 5000)
+    public void testMonitorNotifiedOnException() {
+        withReplicator(replicator -> {
+            final Object monitor = new Object();
+
+            final CountDownLatch preNotifyLatch = new CountDownLatch(1);
+            final CountDownLatch postNotifyLatch = new CountDownLatch(1);
+
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized (monitor) {
+                        while (true) {
+                            // If monitor is not notified, this will block indefinitely, and the test will timeout
+                            try {
+                                preNotifyLatch.countDown();
+                                monitor.wait();
+                                break;
+                            } catch (InterruptedException e) {
+                                continue;
+                            }
+                        }
+
+                        postNotifyLatch.countDown();
+                    }
+                }
+            }).start();
+
+            // wait for the background thread to notify that it is synchronized on monitor.
+            preNotifyLatch.await();
+
+            try {
+                // Pass in Collections.emptySet() for the node ID's so that an Exception is thrown
+                replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(),
+                    Collections.emptyMap(), true, null, true, true, monitor);
+                Assert.fail("replicate did not throw IllegalArgumentException");
+            } catch (final IllegalArgumentException iae) {
+                // expected
+            }
+
+            // wait for monitor to be notified.
+            postNotifyLatch.await();
+        });
+    }
+
+    @Test(timeout = 5000)
+    public void testMonitorNotifiedOnSuccessfulCompletion() {
+        withReplicator(replicator -> {
+            final Object monitor = new Object();
+
+            final CountDownLatch preNotifyLatch = new CountDownLatch(1);
+            final CountDownLatch postNotifyLatch = new CountDownLatch(1);
+
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized (monitor) {
+                        while (true) {
+                            // If monitor is not notified, this will block indefinitely, and the test will timeout
+                            try {
+                                preNotifyLatch.countDown();
+                                monitor.wait();
+                                break;
+                            } catch (InterruptedException e) {
+                                continue;
+                            }
+                        }
+
+                        postNotifyLatch.countDown();
+                    }
+                }
+            }).start();
+
+            // wait for the background thread to notify that it is synchronized on monitor.
+            preNotifyLatch.await();
+
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
+            nodeIds.add(nodeId);
+            final URI uri = new URI("http://localhost:8080/processors/1");
+            final Entity entity = new ProcessorEntity();
+
+            replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, null, true, true, monitor);
+
+            // wait for monitor to be notified.
+            postNotifyLatch.await();
+        });
+    }
+
+
+    @Test(timeout = 5000)
+    public void testMonitorNotifiedOnFailureResponse() {
+        withReplicator(replicator -> {
+            final Object monitor = new Object();
+
+            final CountDownLatch preNotifyLatch = new CountDownLatch(1);
+            final CountDownLatch postNotifyLatch = new CountDownLatch(1);
+
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized (monitor) {
+                        while (true) {
+                            // If monitor is not notified, this will block indefinitely, and the test will timeout
+                            try {
+                                preNotifyLatch.countDown();
+                                monitor.wait();
+                                break;
+                            } catch (InterruptedException e) {
+                                continue;
+                            }
+                        }
+
+                        postNotifyLatch.countDown();
+                    }
+                }
+            }).start();
+
+            // wait for the background thread to notify that it is synchronized on monitor.
+            preNotifyLatch.await();
+
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            final NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false);
+            nodeIds.add(nodeId);
+            final URI uri = new URI("http://localhost:8080/processors/1");
+            final Entity entity = new ProcessorEntity();
+
+            replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, null, true, true, monitor);
+
+            // wait for monitor to be notified.
+            postNotifyLatch.await();
+        }, Status.INTERNAL_SERVER_ERROR, 0L, null);
+    }
+
+
     private void withReplicator(final WithReplicator function) {
         withReplicator(function, ClientResponse.Status.OK, 0L, null);
     }
 
     private void withReplicator(final WithReplicator function, final Status status, final long delayMillis, final RuntimeException failure) {
         final ClusterCoordinator coordinator = createClusterCoordinator();
-        final ThreadPoolRequestReplicator replicator
-                = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) {
+        final NiFiProperties nifiProps = NiFiProperties.createBasicNiFiProperties(null, null);
+        final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, nifiProps) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method,
                                                     final URI uri, final String requestId, Map<String, String> givenHeaders) {