You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/04/23 17:11:00 UTC
[nifi] branch master updated: NIFI-6236: Update
VolatileFlowFileRepository to decrement claimant counts when FlowFiles are
deleted
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e2559d NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted
5e2559d is described below
commit 5e2559db427af7306128e18414f5026ae35cf517
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Apr 22 10:17:16 2019 -0400
NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted
This closes #3451.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../repository/VolatileFlowFileRepository.java | 26 ++++++++++++++++++++++
1 file changed, 26 insertions(+)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
index 979a22e..3881f09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java
@@ -24,6 +24,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -94,6 +95,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
@Override
public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
for (final RepositoryRecord record : records) {
+ updateClaimCounts(record);
+
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
@@ -113,6 +116,29 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
}
}
+ private void updateClaimCounts(final RepositoryRecord record) {
+ final ContentClaim currentClaim = record.getCurrentClaim();
+ final ContentClaim originalClaim = record.getOriginalClaim();
+ final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
+
+ if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
+ decrementClaimCount(currentClaim);
+ }
+
+ if (claimChanged) {
+ // records which have been updated - remove original if exists
+ decrementClaimCount(originalClaim);
+ }
+ }
+
+ private void decrementClaimCount(final ContentClaim claim) {
+ if (claim == null) {
+ return;
+ }
+
+ claimManager.decrementClaimantCount(claim.getResourceClaim());
+ }
+
@Override
public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
return 0;