You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/03/03 14:14:04 UTC

nifi git commit: NIFI-3548: Fixed bug that caused failed requests to not get removed from 'request map' and also results in that preventing the purging logic, which would then unintentially throw exceptions. This closes #1555

Repository: nifi
Updated Branches:
  refs/heads/master 5990db39a -> 4ed64e756


NIFI-3548: Fixed bug that caused failed requests to not get removed from 'request map' and also results in that preventing the purging logic, which would then unintentially throw exceptions. This closes #1555


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4ed64e75
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4ed64e75
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4ed64e75

Branch: refs/heads/master
Commit: 4ed64e7561986c70a396a9c1af59b0efcc075c6c
Parents: 5990db3
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Mar 2 13:55:15 2017 -0500
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Mar 3 09:13:28 2017 -0500

----------------------------------------------------------------------
 .../StandardAsyncClusterResponse.java           |  8 ++++--
 .../TestThreadPoolRequestReplicator.java        | 30 ++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4ed64e75/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
index 1d4ea69..318b1a0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java
@@ -104,8 +104,8 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
     }
 
     @Override
-    public boolean isComplete() {
-        return getMergedResponse() != null;
+    public synchronized boolean isComplete() {
+        return failure != null || mergedResponse != null || requestsCompleted.get() >= responseMap.size();
     }
 
     @Override
@@ -125,6 +125,10 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
 
     public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) {
         if (failure != null) {
+            if (completedResultFetchedCallback != null) {
+                completedResultFetchedCallback.run();
+            }
+
             throw failure;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4ed64e75/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 3c782a7..88a8836 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -77,6 +77,36 @@ public class TestThreadPoolRequestReplicator {
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
     }
 
+    @Test
+    public void testFailedRequestsAreCleanedUp() {
+        withReplicator(replicator -> {
+            final Set<NodeIdentifier> nodeIds = new HashSet<>();
+            nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
+            final URI uri = new URI("http://localhost:8080/processors/1");
+            final Entity entity = new ProcessorEntity();
+
+            // set the user
+            final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
+            SecurityContextHolder.getContext().setAuthentication(authentication);
+
+            final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);
+
+            // We should get back the same response object
+            assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
+
+            assertEquals(HttpMethod.GET, response.getMethod());
+            assertEquals(nodeIds, response.getNodesInvolved());
+
+            assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));
+
+            final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS);
+            assertEquals(8000, nodeResponse.getNodeId().getApiPort());
+            assertEquals(ClientResponse.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus());
+
+            assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
+        }, Status.FORBIDDEN, 0L, null);
+    }
+
     /**
      * If we replicate a request, whenever we obtain the merged response from
      * the AsyncClusterResponse object, the response should no longer be