You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/04/23 17:11:00 UTC

[nifi] branch master updated: NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e2559d  NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted
5e2559d is described below

commit 5e2559db427af7306128e18414f5026ae35cf517
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Apr 22 10:17:16 2019 -0400

    NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted
    
    This closes #3451.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../repository/VolatileFlowFileRepository.java     | 26 ++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index 979a22e..3881f09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -94,6 +95,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
         for (final RepositoryRecord record : records) {
+            updateClaimCounts(record);
+
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
                 if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
@@ -113,6 +116,29 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
         }
     }
 
+    private void updateClaimCounts(final RepositoryRecord record) {
+        final ContentClaim currentClaim = record.getCurrentClaim();
+        final ContentClaim originalClaim = record.getOriginalClaim();
+        final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
+
+        if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
+            decrementClaimCount(currentClaim);
+        }
+
+        if (claimChanged) {
+            // records which have been updated - remove original if exists
+            decrementClaimCount(originalClaim);
+        }
+    }
+
+    private void decrementClaimCount(final ContentClaim claim) {
+        if (claim == null) {
+            return;
+        }
+
+        claimManager.decrementClaimantCount(claim.getResourceClaim());
+    }
+
     @Override
     public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
         return 0;