You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/03 13:51:12 UTC

nifi git commit: NIFI-3668: Fix purging expired replicate requests.

Repository: nifi
Updated Branches:
  refs/heads/master 84f1fb395 -> 5e62b4ae7


NIFI-3668: Fix purging expired replicate requests.

This closes #1646.

Newly created async response is added before checking map size nor
purging expired ones. If there are already 100 remaining requests,
the added request will not be executed nor removed.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5e62b4ae
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5e62b4ae
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5e62b4ae

Branch: refs/heads/master
Commit: 5e62b4ae72c6fa8c81aa3ee817ad1bd14e350c4a
Parents: 84f1fb3
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Apr 3 15:56:21 2017 +0900
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 3 09:50:47 2017 -0400

----------------------------------------------------------------------
 .../ThreadPoolRequestReplicator.java            | 68 +++++++++++---------
 1 file changed, 36 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5e62b4ae/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index bc2f8bb..5a19ca3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -316,36 +316,6 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 throw new IllegalArgumentException("Cannot replicate request to 0 nodes");
             }
 
-            // Update headers to indicate the current revision so that we can
-            // prevent multiple users changing the flow at the same time
-            final Map<String, String> updatedHeaders = new HashMap<>(headers);
-            final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
-
-            // create a response object if one was not already passed to us
-            if (response == null) {
-                // create the request objects and replicate to all nodes.
-                // When the request has completed, we need to ensure that we notify the monitor, if there is one.
-                final CompletionCallback completionCallback = clusterResponse -> {
-                    try {
-                        onCompletedResponse(requestId);
-                    } finally {
-                        if (monitor != null) {
-                            synchronized (monitor) {
-                                monitor.notify();
-                            }
-
-                            logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri);
-                        }
-                    }
-                };
-
-                final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
-
-                response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
-                    responseMapper, completionCallback, responseConsumedCallback, merge);
-                responseMap.put(requestId, response);
-            }
-
             // verify all of the nodes exist and are in the proper state
             for (final NodeIdentifier nodeId : nodeIds) {
                 final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId);
@@ -360,11 +330,16 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
 
             logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response);
 
+            // Update headers to indicate the current revision so that we can
+            // prevent multiple users changing the flow at the same time
+            final Map<String, String> updatedHeaders = new HashMap<>(headers);
+            final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString());
+
+            long verifyClusterStateNanos = -1;
             if (performVerification) {
                 final long start = System.nanoTime();
                 verifyClusterState(method, uri.getPath());
-                final long nanos = System.nanoTime() - start;
-                response.addTiming("Verify Cluster State", "All Nodes", nanos);
+                verifyClusterStateNanos = System.nanoTime() - start;
             }
 
             int numRequests = responseMap.size();
@@ -382,6 +357,35 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                 throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests");
             }
 
+            // create a response object if one was not already passed to us
+            if (response == null) {
+                // create the request objects and replicate to all nodes.
+                // When the request has completed, we need to ensure that we notify the monitor, if there is one.
+                final CompletionCallback completionCallback = clusterResponse -> {
+                    try {
+                        onCompletedResponse(requestId);
+                    } finally {
+                        if (monitor != null) {
+                            synchronized (monitor) {
+                                monitor.notify();
+                            }
+
+                            logger.debug("Notified monitor {} because request {} {} has completed", monitor, method, uri);
+                        }
+                    }
+                };
+
+                final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId);
+
+                response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
+                        responseMapper, completionCallback, responseConsumedCallback, merge);
+                responseMap.put(requestId, response);
+            }
+
+            if (verifyClusterStateNanos > -1) {
+                response.addTiming("Verify Cluster State", "All Nodes", verifyClusterStateNanos);
+            }
+
             logger.debug("For Request ID {}, response object is {}", requestId, response);
 
             // if mutable request, we have to do a two-phase commit where we ask each node to verify