You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/01 20:42:47 UTC

[nifi] 16/18: NIFI-6150: Moved logic of decrementing content claims from ProcessSession.commit to the FlowFile Repository. Also updated load-balancing protocol and queue drop logic to no longer decrement content claims since the flowfile repo will handle the logic. This allows us ts to ensure that the claimant counts are decremented only if the repo is successfully updated and still before checking if the claim is still in use or not.

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch NIFI-6169-RC1
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c1b478b893f4211d5843e64e81060a32aeaaf20d
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Mar 26 15:42:51 2019 -0400

    NIFI-6150: Moved logic of decrementing content claims from ProcessSession.commit to the FlowFile Repository. Also updated load-balancing protocol and queue drop logic to no longer decrement content claims since the flowfile repo will handle the logic. This allows us ts to ensure that the claimant counts are decremented only if the repo is successfully updated and still before checking if the claim is still in use or not.
    
    This closes #3391.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../controller/queue/AbstractFlowFileQueue.java    | 16 +-------
 .../clustered/SocketLoadBalancedFlowFileQueue.java |  7 +---
 .../clustered/partition/RemoteQueuePartition.java  |  5 ---
 .../repository/StandardProcessSession.java         | 46 ++--------------------
 .../repository/WriteAheadFlowFileRepository.java   | 32 ++++++++++++++-
 .../repository/TestStandardProcessSession.java     | 18 ++++++++-
 .../scheduling/ProcessorLifecycleIT.java           | 29 --------------
 7 files changed, 54 insertions(+), 99 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index ec3c550..8b29613 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -386,25 +386,11 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
         // Create a Provenance Event and a FlowFile Repository record for each FlowFile
         final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
         final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
+        long dropContentSize = 0L;
         for (final FlowFileRecord flowFile : flowFiles) {
             provenanceEvents.add(createDropProvenanceEvent(flowFile, requestor));
             flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
-        }
-
-        long dropContentSize = 0L;
-        for (final FlowFileRecord flowFile : flowFiles) {
             dropContentSize += flowFile.getSize();
-            final ContentClaim contentClaim = flowFile.getContentClaim();
-            if (contentClaim == null) {
-                continue;
-            }
-
-            final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
-            if (resourceClaim == null) {
-                continue;
-            }
-
-            resourceClaimManager.decrementClaimantCount(resourceClaim);
         }
 
         provRepository.registerEvents(provenanceEvents);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 353af49..12ee8d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -976,16 +976,11 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
             provenanceEvents.add(provenanceEvent);
 
             final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
-            logger.info("{} terminated due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, flowFileLife});
+            logger.debug("{} terminated due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, flowFileLife});
         }
 
         try {
             flowFileRepo.updateRepository(expiredRecords);
-
-            for (final RepositoryRecord expiredRecord : expiredRecords) {
-                contentRepo.decrementClaimantCount(expiredRecord.getCurrentClaim());
-            }
-
             provRepo.registerEvents(provenanceEvents);
 
             adjustSize(-expired.size(), -expired.stream().mapToLong(FlowFileRecord::getSize).sum());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 854a3a5..106ab26 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -261,11 +261,6 @@ public class RemoteQueuePartition implements QueuePartition {
 
         flowFileRepoRecords.addAll(abortedRecords);
 
-        // Decrement claimant count for each FlowFile.
-        flowFileRepoRecords.stream()
-                .map(RepositoryRecord::getCurrentClaim)
-                .forEach(contentRepo::decrementClaimantCount);
-
         try {
             flowFileRepo.updateRepository(flowFileRepoRecords);
         } catch (final Exception e) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 7a87678..78e93a1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -377,51 +377,22 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final long flowFileRepoUpdateFinishNanos = System.nanoTime();
             final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart;
 
-            final long claimRemovalStart = flowFileRepoUpdateFinishNanos;
-
-            /**
-             * Figure out which content claims can be released. At this point,
-             * we will decrement the Claimant Count for the claims via the
-             * Content Repository. We do not actually destroy the content
-             * because otherwise, we could remove the Original Claim and
-             * crash/restart before the FlowFileRepository is updated. This will
-             * result in the FlowFile being restored such that the content claim
-             * points to the Original Claim -- which has already been removed!
-             *
-             */
-            for (final StandardRepositoryRecord record : checkpoint.records.values()) {
-                if (record.isMarkedForDelete()) {
-                    // if the working claim is not the same as the original claim, we can immediately destroy the working claim
-                    // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync.
-                    decrementClaimCount(record.getWorkingClaim());
-
-                    if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
-                        // if working & original claim are same, don't remove twice; we only want to remove the original
-                        // if it's different from the working. Otherwise, we remove two claimant counts. This causes
-                        // an issue if we only updated the FlowFile attributes.
-                        decrementClaimCount(record.getOriginalClaim());
-                    }
-
-                    if (LOG.isInfoEnabled()) {
+            if (LOG.isInfoEnabled()) {
+                for (final RepositoryRecord record : checkpoint.records.values()) {
+                    if (record.isMarkedForAbort()) {
                         final FlowFileRecord flowFile = record.getCurrent();
                         final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
                         final Connectable connectable = context.getConnectable();
                         final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
                         LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
                     }
-                } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
-                    // records which have been updated - remove original if exists
-                    decrementClaimCount(record.getOriginalClaim());
                 }
             }
 
-            final long claimRemovalFinishNanos = System.nanoTime();
-            final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
-
             updateEventRepository(checkpoint);
 
             final long updateEventRepositoryFinishNanos = System.nanoTime();
-            final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
+            final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
 
             // transfer the flowfiles to the connections' queues.
             final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
@@ -480,8 +451,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 formatNanos(commitNanos, timingInfo);
                 timingInfo.append("; FlowFile Repository Update took ");
                 formatNanos(flowFileRepoUpdateNanos, timingInfo);
-                timingInfo.append("; Claim Removal took ");
-                formatNanos(claimRemovalNanos, timingInfo);
                 timingInfo.append("; FlowFile Event Update took ");
                 formatNanos(updateEventRepositoryNanos, timingInfo);
                 timingInfo.append("; Enqueuing FlowFiles took ");
@@ -1010,12 +979,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         for (final StandardRepositoryRecord record : recordsToHandle) {
             if (record.isMarkedForAbort()) {
                 decrementClaimCount(record.getWorkingClaim());
-                if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {
-                    // if working & original claim are same, don't remove twice; we only want to remove the original
-                    // if it's different from the working. Otherwise, we remove two claimant counts. This causes
-                    // an issue if we only updated the flowfile attributes.
-                    decrementClaimCount(record.getCurrentClaim());
-                }
                 abortedRecords.add(record);
             } else {
                 transferRecords.add(record);
@@ -2104,7 +2067,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             record.markForDelete();
             expiredRecords.add(record);
             expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
-            decrementClaimCount(flowFile.getContentClaim());
 
             final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
             final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 2fc2855..4215633 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
@@ -307,7 +308,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
         // update the repository.
         final int partitionIndex = wal.update(recordsForWal, sync);
+        updateContentClaims(records, partitionIndex);
+    }
 
+    private void updateContentClaims(Collection<RepositoryRecord> repositoryRecords, final int partitionIndex) {
         // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful.
         // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim,
         // it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content
@@ -321,7 +325,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         final Set<String> swapLocationsAdded = new HashSet<>();
         final Set<String> swapLocationsRemoved = new HashSet<>();
 
-        for (final RepositoryRecord record : records) {
+        for (final RepositoryRecord record : repositoryRecords) {
+            updateClaimCounts(record);
+
             if (record.getType() == RepositoryRecordType.DELETE) {
                 // For any DELETE record that we have, if claim is destructible, mark it so
                 if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) {
@@ -381,6 +387,30 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
         }
     }
 
+    private void updateClaimCounts(final RepositoryRecord record) {
+        final ContentClaim currentClaim = record.getCurrentClaim();
+        final ContentClaim originalClaim = record.getOriginalClaim();
+        final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
+
+        if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
+            decrementClaimCount(currentClaim);
+        }
+
+        if (claimChanged) {
+            // records which have been updated - remove original if exists
+            decrementClaimCount(originalClaim);
+        }
+    }
+
+    private void decrementClaimCount(final ContentClaim claim) {
+        if (claim == null) {
+            return;
+        }
+
+        claimManager.decrementClaimantCount(claim.getResourceClaim());
+    }
+
+
     protected static String getLocationSuffix(final String swapLocation) {
         if (swapLocation == null) {
             return null;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index b001e79..a1fa975 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -79,6 +79,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -191,7 +192,7 @@ public class TestStandardProcessSession {
 
         contentRepo = new MockContentRepository();
         contentRepo.initialize(new StandardResourceClaimManager());
-        flowFileRepo = new MockFlowFileRepository();
+        flowFileRepo = new MockFlowFileRepository(contentRepo);
 
         context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
         session = new StandardProcessSession(context, () -> false);
@@ -2149,6 +2150,11 @@ public class TestStandardProcessSession {
         private boolean failOnUpdate = false;
         private final AtomicLong idGenerator = new AtomicLong(0L);
         private final List<RepositoryRecord> updates = new ArrayList<>();
+        private final ContentRepository contentRepo;
+
+        public MockFlowFileRepository(final ContentRepository contentRepo) {
+            this.contentRepo = contentRepo;
+        }
 
         public void setFailOnUpdate(final boolean fail) {
             this.failOnUpdate = fail;
@@ -2178,6 +2184,16 @@ public class TestStandardProcessSession {
                 throw new IOException("FlowFile Repository told to fail on update for unit test");
             }
             updates.addAll(records);
+
+            for (final RepositoryRecord record : records) {
+                if (record.getType() == RepositoryRecordType.DELETE) {
+                    contentRepo.decrementClaimantCount(record.getCurrentClaim());
+                }
+
+                if (!Objects.equals(record.getOriginalClaim(), record.getCurrentClaim())) {
+                    contentRepo.decrementClaimantCount(record.getOriginalClaim());
+                }
+            }
         }
 
         public List<RepositoryRecord> getUpdates() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
index 890d799..1c7f187 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java
@@ -35,7 +35,6 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
-import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.ExtensionDiscoveringManager;
@@ -75,7 +74,6 @@ import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -487,33 +485,6 @@ public class ProcessorLifecycleIT {
 
 
     /**
-     * Validate that processor will not be validated on failing
-     * ControllerService validation (not enabled).
-     */
-    @Test(expected = IllegalStateException.class)
-    public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception {
-        final FlowManagerAndSystemBundle fcsb = this.buildFlowControllerForTest();
-        flowManager = fcsb.getFlowManager();
-
-        ProcessGroup testGroup = flowManager.createProcessGroup(UUID.randomUUID().toString());
-
-        ControllerServiceNode testServiceNode = flowManager.createControllerService(TestService.class.getName(), "serv",
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate(), null, true, true);
-        ProcessorNode testProcNode = flowManager.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(),
-                fcsb.getSystemBundle().getBundleDetails().getCoordinate());
-
-        properties.put("S", testServiceNode.getIdentifier());
-        testProcNode.setProperties(properties);
-
-        TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor();
-        testProcessor.withService = true;
-
-        processScheduler.startProcessor(testProcNode, true);
-        fail();
-    }
-
-
-    /**
      * Scenario where onTrigger() is executed with random delay limited to
      * 'delayLimit', yet with guaranteed exit from onTrigger().
      */