You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/04/25 16:03:56 UTC

nifi git commit: NIFI-5000 - ListHDFS properly lists files from updated directory path

Repository: nifi
Updated Branches:
  refs/heads/master ac9944cce -> c118e9623


NIFI-5000 - ListHDFS properly lists files from updated directory path

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2576.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c118e962
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c118e962
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c118e962

Branch: refs/heads/master
Commit: c118e9623899a34613f0ac8a6f02eafe76df12ae
Parents: ac9944c
Author: zenfenan <si...@gmail.com>
Authored: Thu Mar 22 22:43:52 2018 +0530
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Apr 25 18:03:44 2018 +0200

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/ListHDFS.java | 14 +++++++----
 .../nifi/processors/hadoop/TestListHDFS.java    | 25 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c118e962/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index d33fc2e..bd30ca1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -143,7 +143,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
     private volatile long latestTimestampListed = -1L;
     private volatile long latestTimestampEmitted = -1L;
     private volatile long lastRunTimestamp = -1L;
-
+    private volatile boolean resetState = false;
     static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
     static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
 
@@ -202,8 +202,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            latestTimestampEmitted = -1L;
-            latestTimestampListed = -1L;
+            this.resetState = true;
         }
     }
 
@@ -283,8 +282,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return toList;
     }
 
-
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         // We have to ensure that we don't continually perform listings, because if we perform two listings within
@@ -302,6 +299,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
+            if (resetState) {
+                getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+                context.getStateManager().clear(Scope.CLUSTER);
+                this.resetState = false;
+            }
+
             final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
             if (stateMap.getVersion() == -1L) {
                 latestTimestampEmitted = -1L;
@@ -464,4 +467,5 @@ public class ListHDFS extends AbstractHadoopProcessor {
             }
         };
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c118e962/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index f176a5f..fab28f3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -381,6 +381,31 @@ public class TestListHDFS {
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
     }
 
+    @Test
+    public void testListAfterDirectoryChange() throws InterruptedException {
+        proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 100L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_1.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, false, 1, 1L, 150L,0L, create777(), "owner", "group", new Path("/test2/testFile-2_1.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 200L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_2.txt")));
+
+        runner.setProperty(ListHDFS.DIRECTORY, "/test1");
+
+        runner.run(); // Initial run, latest file from /test1 will be ignored
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should also be picked up now
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing directory should reset the state
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run(); // Will ignore the files for this cycle
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run(); // Since state has been reset, testFile-2_1.txt from /test2 should be picked up
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
+    }
+
 
     private FsPermission create777() {
         return new FsPermission((short) 0777);