You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:21 UTC
[nifi] 17/22: NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit cad07ab75c012e8834c9cfb7feea28147ad5eb0d
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Nov 3 15:37:54 2021 -0400
NIFI-9362: Ensure that we update the StateMap in AbstractListProcessor to hold any files whose date matches the latest before setting cluster-wide state
---
.../processor/util/list/AbstractListProcessor.java | 25 +++++++------
.../util/list/TestAbstractListProcessor.java | 41 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 11 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 4fcb862..dce4c24 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -851,6 +851,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
if (latestListedEntryTimestampThisCycleMillis != null) {
final boolean processedNewFiles = entitiesListed > 0;
+ if (processedNewFiles) {
+ // If there have been files created, update the last timestamp we processed.
+ // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
+ // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+ if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+ // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
+ // If it didn't change, we need to add identifiers.
+ latestIdentifiersProcessed.clear();
+ }
+ // Capture latestIdentifierProcessed.
+ latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
+ lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
+ }
+
if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
// We have performed a listing and pushed any FlowFiles out that may have been generated
// Now, we need to persist state about the Last Modified timestamp of the newest file
@@ -870,17 +884,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
if (processedNewFiles) {
- // If there have been files created, update the last timestamp we processed.
- // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
- // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
- if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
- // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
- // If it didn't change, we need to add identifiers.
- latestIdentifiersProcessed.clear();
- }
- // Capture latestIdentifierProcessed.
- latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
- lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
session.commitAsync();
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 4f78e8c..75ada70 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -116,6 +117,46 @@ public class TestAbstractListProcessor {
public final TemporaryFolder testFolder = new TemporaryFolder();
@Test
+ public void testStateMigratedWhenPrimaryNodeSwitch() throws IOException {
+ // add a few entities
+ for (int i=0; i < 5; i++) {
+ proc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
+ }
+
+ // Add an entity with a later timestamp
+ proc.addEntity("10", "10", 99999999L);
+
+ // Run the processor. All 6 should be listed.
+ runner.run();
+ runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 6);
+
+ // Now, we want to mimic Primary Node changing. To do so, we'll capture the Cluster State from the State Manager,
+ // create a new Processor, and set the state to be the same, and update the processor in order to produce the same listing.
+ final ConcreteListProcessor secondProc = new ConcreteListProcessor();
+ // Add same listing to the new processor
+ for (int i=0; i < 5; i++) {
+ secondProc.addEntity(String.valueOf(i), String.valueOf(i), 88888L);
+ }
+ secondProc.addEntity("10", "10", 99999999L);
+
+ // Create new runner for the second processor and update its state to match that of the last TestRunner.
+ final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
+ runner = TestRunners.newTestRunner(secondProc);
+ runner.getStateManager().setState(stateMap.toMap(), Scope.CLUSTER);
+
+ // Run several times, ensuring that nothing is emitted.
+ for (int i=0; i < 10; i++) {
+ runner.run();
+ runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0);
+ }
+
+ // Add one more entry and ensure that it's emitted.
+ secondProc.addEntity("new", "new", 999999990L);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1);
+ }
+
+ @Test
public void testStateMigratedFromCacheService() throws InitializationException {
final DistributedCache cache = new DistributedCache();