You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/10/31 19:25:59 UTC
nifi git commit: NIFI-5771: Ensure that we only increment claimant
count for content claim if we have a FlowFile that references it
Repository: nifi
Updated Branches:
refs/heads/master 1f2cf4bc6 -> 4c10b47e6
NIFI-5771: Ensure that we only increment claimant count for content claim if we have a FlowFile that references it
This closes #3118.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4c10b47e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4c10b47e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4c10b47e
Branch: refs/heads/master
Commit: 4c10b47e602741adc52ad693a9bc56b9964cd7ef
Parents: 1f2cf4b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 31 13:11:58 2018 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed Oct 31 15:25:24 2018 -0400
----------------------------------------------------------------------
.../server/StandardLoadBalanceProtocol.java | 26 +++++++++++++-------
.../queue/clustered/LoadBalancedQueueIT.java | 4 +--
.../server/TestStandardLoadBalanceProtocol.java | 12 ++++++---
3 files changed, 28 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index f508d12..dc780db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -285,17 +285,14 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
if (contentClaim == null) {
contentClaim = contentRepository.create(false);
contentClaimOut = contentRepository.write(contentClaim);
- } else {
- contentRepository.incrementClaimaintCount(contentClaim);
}
- final RemoteFlowFileRecord flowFile;
- try {
- flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
- } catch (final Exception e) {
- contentRepository.decrementClaimantCount(contentClaim);
- throw e;
- }
+ final RemoteFlowFileRecord flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
+
+ // The FlowFile's Content Claim will either be null or equal to the provided Content Claim.
+ // Incrementing the FlowFile's content claim will increment the count for the provided Content Claim, if it was
+ // assigned to the FlowFIle, or call incrementClaimantCount with an argument of null, which will do nothing.
+ contentRepository.incrementClaimaintCount(flowFile.getFlowFile().getContentClaim());
flowFilesReceived.add(flowFile);
@@ -307,8 +304,17 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
}
}
+ // When the Content Claim is created initially, it has a Claimaint Count of 1. We then increment the Claimant Count for each FlowFile that we add to the Content Claim,
+ // which means that the claimant count is currently 1 larger than it needs to be. So we will decrement the claimant count now. If that results in a count of 0, then
+ // we can go ahead and remove the Content Claim, since we know it's not being referenced.
+ final int count = contentRepository.decrementClaimantCount(contentClaim);
+
verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size());
completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);
+
+ if (count == 0) {
+ contentRepository.remove(contentClaim);
+ }
} catch (final Exception e) {
// If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because
// they are no longer needed.
@@ -316,6 +322,8 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim());
}
+ contentRepository.remove(contentClaim);
+
throw e;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
index 4871d72..e947b1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java
@@ -1211,7 +1211,7 @@ public class LoadBalancedQueueIT {
localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null);
nodeIdentifiers.add(localNodeId);
- when(serverQueue.isFull()).thenReturn(true);
+ when(serverQueue.isLocalPartitionFull()).thenReturn(true);
// Create the server
final int timeoutMillis = 30000;
@@ -1266,7 +1266,7 @@ public class LoadBalancedQueueIT {
assertEquals(2, flowFileQueue.size().getObjectCount());
// Enable data to be transferred
- when(serverQueue.isFull()).thenReturn(false);
+ when(serverQueue.isLocalPartitionFull()).thenReturn(false);
while (clientRepoRecords.size() != 1) {
Thread.sleep(10L);
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index 94f992f..c801f8c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -458,7 +458,9 @@ public class TestStandardLoadBalanceProtocol {
Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
- Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
}
@Test
@@ -509,7 +511,9 @@ public class TestStandardLoadBalanceProtocol {
Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
- Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(0)).incrementClaimaintCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(0)).decrementClaimantCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
}
@Test
@@ -559,7 +563,9 @@ public class TestStandardLoadBalanceProtocol {
Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection());
Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
- Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next());
+ Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next());
}
@Test