You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/04/25 16:03:56 UTC
nifi git commit: NIFI-5000 - ListHDFS properly lists files from
updated directory path
Repository: nifi
Updated Branches:
refs/heads/master ac9944cce -> c118e9623
NIFI-5000 - ListHDFS properly lists files from updated directory path
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2576.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c118e962
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c118e962
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c118e962
Branch: refs/heads/master
Commit: c118e9623899a34613f0ac8a6f02eafe76df12ae
Parents: ac9944c
Author: zenfenan <si...@gmail.com>
Authored: Thu Mar 22 22:43:52 2018 +0530
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed Apr 25 18:03:44 2018 +0200
----------------------------------------------------------------------
.../apache/nifi/processors/hadoop/ListHDFS.java | 14 +++++++----
.../nifi/processors/hadoop/TestListHDFS.java | 25 ++++++++++++++++++++
2 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c118e962/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index d33fc2e..bd30ca1 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -143,7 +143,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L;
private volatile long lastRunTimestamp = -1L;
-
+ private volatile boolean resetState = false;
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
@@ -202,8 +202,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
- latestTimestampEmitted = -1L;
- latestTimestampListed = -1L;
+ this.resetState = true;
}
}
@@ -283,8 +282,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
return toList;
}
-
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// We have to ensure that we don't continually perform listings, because if we perform two listings within
@@ -302,6 +299,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try {
+ if (resetState) {
+ getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+ context.getStateManager().clear(Scope.CLUSTER);
+ this.resetState = false;
+ }
+
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L) {
latestTimestampEmitted = -1L;
@@ -464,4 +467,5 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
};
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c118e962/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index f176a5f..fab28f3 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -381,6 +381,31 @@ public class TestListHDFS {
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
}
+ @Test
+ public void testListAfterDirectoryChange() throws InterruptedException {
+ proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 100L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_1.txt")));
+ proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, false, 1, 1L, 150L,0L, create777(), "owner", "group", new Path("/test2/testFile-2_1.txt")));
+ proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 200L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_2.txt")));
+
+ runner.setProperty(ListHDFS.DIRECTORY, "/test1");
+
+ runner.run(); // Initial run, latest file from /test1 will be ignored
+
+ Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should also be picked up now
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+ runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing directory should reset the state
+
+ Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ runner.run(); // Will ignore the files for this cycle
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+ Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ runner.run(); // Since state has been reset, testFile-2_1.txt from /test2 should be picked up
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
+ }
+
private FsPermission create777() {
return new FsPermission((short) 0777);