You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:21 UTC

[nifi] 17/22: NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit cad07ab75c012e8834c9cfb7feea28147ad5eb0d
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Nov 3 15:37:54 2021 -0400

    NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state
---
 .../processor/util/list/AbstractListProcessor.java | 25 +++++++------
 .../util/list/TestAbstractListProcessor.java       | 41 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 4fcb862..dce4c24 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -851,6 +851,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         if (latestListedEntryTimestampThisCycleMillis != null) {
             final boolean processedNewFiles = entitiesListed > 0;
 
+            if (processedNewFiles) {
+                // If there have been files created, update the last timestamp we processed.
+                // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
+                // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+                if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
+                    // If it didn't change, we need to add identifiers.
+                    latestIdentifiersProcessed.clear();
+                }
+                // Capture latestIdentifierProcessed.
+                latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
+                lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
+            }
+
             if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                 // We have performed a listing and pushed any FlowFiles out that may have been generated
                 // Now, we need to persist state about the Last Modified timestamp of the newest file
@@ -870,17 +884,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             }
 
             if (processedNewFiles) {
-                // If there have been files created, update the last timestamp we processed.
-                // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
-                // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
-                if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
-                    // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
-                    // If it didn't change, we need to add identifiers.
-                    latestIdentifiersProcessed.clear();
-                }
-                // Capture latestIdentifierProcessed.
-                latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
-                lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
                 getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
                 session.commitAsync();
             }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 4f78e8c..75ada70 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -116,6 +117,46 @@ public class TestAbstractListProcessor {
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Test
+    public void testStateMigratedWhenPrimaryNodeSwitch() throws IOException {
+        // add a few entities
+        for (int i=0; i < 5; i++) {
+            proc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
+        }
+
+        // Add an entity with a later timestamp
+        proc.addEntity("10", "10", 99999999L);
+
+        // Run the processor. All 6 should be listed.
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 6);
+
+        // Now, we want to mimic Primary Node changing. To do so, we'll capture the Cluster State from the State Manager,
+        // create a new Processor, and set the state to be the same, and update the processor in order to produce the same listing.
+        final ConcreteListProcessor secondProc = new ConcreteListProcessor();
+        // Add same listing to the new processor
+        for (int i=0; i < 5; i++) {
+            secondProc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
+        }
+        secondProc.addEntity("10", "10", 99999999L);
+
+        // Create new runner for the second processor and update its state to match that of the last TestRunner.
+        final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
+        runner = TestRunners.newTestRunner(secondProc);
+        runner.getStateManager().setState(stateMap.toMap(), Scope.CLUSTER);
+
+        // Run several times, ensuring that nothing is emitted.
+        for (int i=0; i < 10; i++) {
+            runner.run();
+            runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0);
+        }
+
+        // Add one more entry and ensure that it's emitted.
+        secondProc.addEntity("new", "new", 999999990L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1);
+    }
+
+    @Test
     public void testStateMigratedFromCacheService() throws InitializationException {
 
         final DistributedCache cache = new DistributedCache();