You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/09/17 14:02:15 UTC

[nifi] branch master updated: NIFI-6677: Update ListHDFS to clear state (when appropriate) in an @OnScheduled method, just as AbstractListProcessor does, instead of doing it in onTrigger. Doing it in onTrigger is problematic because in a cluster, the Primary Node may run for some period of time, perhaps days or months. Then, when the Primary Node chagnes, onTrigger gets called for the first time on the new Primary Node, and this triggers the processor to clear state.

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new fd3f070  NIFI-6677: Update ListHDFS to clear state (when appropriate) in an @OnScheduled method, just as AbstractListProcessor does, instead of doing it in onTrigger. Doing it in onTrigger is problematic because in a cluster, the Primary Node may run for some period of time, perhaps days or months. Then, when the Primary Node chagnes, onTrigger gets called for the first time on the new Primary Node, and this triggers the processor to clear state.
fd3f070 is described below

commit fd3f0707c66b78622ed4e7f5aeaff63580c46d44
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Sep 16 13:46:53 2019 -0400

    NIFI-6677: Update ListHDFS to clear state (when appropriate) in an @OnScheduled method, just as AbstractListProcessor does, instead of doing it in onTrigger. Doing it in onTrigger is problematic because in a cluster, the Primary Node may run for some period of time, perhaps days or months. Then, when the Primary Node chagnes, onTrigger gets called for the first time on the new Primary Node, and this triggers the processor to clear state.
---
 .../java/org/apache/nifi/processors/hadoop/ListHDFS.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 27f58ef..15ed4b1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -315,6 +316,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return toList;
     }
 
+    @OnScheduled
+    public void resetStateIfNecessary(final ProcessContext context) throws IOException {
+        if (resetState) {
+            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            context.getStateManager().clear(Scope.CLUSTER);
+            this.resetState = false;
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         // We have to ensure that we don't continually perform listings, because if we perform two listings within
@@ -332,12 +342,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            if (resetState) {
-                getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
-                context.getStateManager().clear(Scope.CLUSTER);
-                this.resetState = false;
-            }
-
             final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
             if (stateMap.getVersion() == -1L) {
                 latestTimestampEmitted = -1L;