You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/10/31 19:25:59 UTC

nifi git commit: NIFI-5771: Ensure that we only increment claimant count for content claim if we have a FlowFile that references it

Repository: nifi
Updated Branches:
  refs/heads/master 1f2cf4bc6 -> 4c10b47e6


NIFI-5771: Ensure that we only increment claimant count for content claim if we have a FlowFile that references it

This closes #3118.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4c10b47e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4c10b47e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4c10b47e

Branch: refs/heads/master
Commit: 4c10b47e602741adc52ad693a9bc56b9964cd7ef
Parents: 1f2cf4b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 31 13:11:58 2018 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Oct 31 15:25:24 2018 -0400

----------------------------------------------------------------------
 .../server/StandardLoadBalanceProtocol.java     | 26 +++++++++++++-------
 .../queue/clustered/LoadBalancedQueueIT.java    |  4 +--
 .../server/TestStandardLoadBalanceProtocol.java | 12 ++++++---
 3 files changed, 28 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index f508d12..dc780db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -285,17 +285,14 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
                     if (contentClaim == null) {
                         contentClaim = contentRepository.create(false);
                         contentClaimOut = contentRepository.write(contentClaim);
-                    } else {
-                        contentRepository.incrementClaimaintCount(contentClaim);
                     }
 
-                    final RemoteFlowFileRecord flowFile;
-                    try {
-                        flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
-                    } catch (final Exception e) {
-                        contentRepository.decrementClaimantCount(contentClaim);
-                        throw e;
-                    }
+                    final RemoteFlowFileRecord flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
+
+                    // The FlowFile's Content Claim will either be null or equal to the provided Content Claim.
+                    // Incrementing the FlowFile's content claim will increment the count for the provided Content Claim, if it was
+                    // assigned to the FlowFIle, or call incrementClaimantCount with an argument of null, which will do nothing.
+                    contentRepository.incrementClaimaintCount(flowFile.getFlowFile().getContentClaim());
 
                     flowFilesReceived.add(flowFile);
 
@@ -307,8 +304,17 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
                 }
             }
 
+            // When the Content Claim is created initially, it has a Claimaint Count of 1. We then increment the Claimant Count for each FlowFile that we add to the Content Claim,
+            // which means that the claimant count is currently 1 larger than it needs to be. So we will decrement the claimant count now. If that results in a count of 0, then
+            // we can go ahead and remove the Content Claim, since we know it's not being referenced.
+            final int count = contentRepository.decrementClaimantCount(contentClaim);
+
             verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size());
             completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);
+
+            if (count == 0) {
+                contentRepository.remove(contentClaim);
+            }
         } catch (final Exception e) {
             // If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because
             // they are no longer needed.
@@ -316,6 +322,8 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
                 contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim());
             }
 
+            contentRepository.remove(contentClaim);
+
             throw e;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index 4871d72..e947b1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -1211,7 +1211,7 @@ public class LoadBalancedQueueIT {
         localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
         nodeIdentifiers.add(localNodeId);
 
-        when(serverQueue.isFull()).thenReturn(true);
+        when(serverQueue.isLocalPartitionFull()).thenReturn(true);
 
         // Create the server
         final int timeoutMillis = 30000;
@@ -1266,7 +1266,7 @@ public class LoadBalancedQueueIT {
                 assertEquals(2, flowFileQueue.size().getObjectCount());
 
                 // Enable data to be transferred
-                when(serverQueue.isFull()).thenReturn(false);
+                when(serverQueue.isLocalPartitionFull()).thenReturn(false);
 
                 while (clientRepoRecords.size() != 1) {
                     Thread.sleep(10L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index 94f992f..c801f8c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -458,7 +458,9 @@ public class TestStandardLoadBalanceProtocol {
         Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
         Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
         Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
-        Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
     }
 
     @Test
@@ -509,7 +511,9 @@ public class TestStandardLoadBalanceProtocol {
         Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
         Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
         Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
-        Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(0)).incrementClaimaintCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(0)).decrementClaimantCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
     }
 
     @Test
@@ -559,7 +563,9 @@ public class TestStandardLoadBalanceProtocol {
         Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
         Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
         Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
-        Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next());
+        Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
     }
 
     @Test