You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by je...@apache.org on 2018/01/31 17:03:34 UTC

nifi git commit: NIFI-4747 - Removed directory existence check in GetHDFS

Repository: nifi
Updated Branches:
  refs/heads/master 6e7dfb993 -> dc67bd2fd


NIFI-4747 - Removed directory existence check in GetHDFS

This closes #2391

Signed-off-by: Jeremy Dyer <je...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dc67bd2f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dc67bd2f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dc67bd2f

Branch: refs/heads/master
Commit: dc67bd2fdd762e48075dfa5edbe1a427025c6576
Parents: 6e7dfb9
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue Jan 9 17:45:51 2018 +0100
Committer: Jeremy Dyer <je...@apache.org>
Committed: Wed Jan 31 12:01:17 2018 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/GetHDFS.java  | 21 ++++++++++----------
 .../nifi/processors/hadoop/GetHDFSTest.java     | 11 ++++++++++
 2 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/dc67bd2f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index 1aefc75..27b7375 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -238,11 +238,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
         abstractOnScheduled(context);
         // copy configuration values to pass them around cleanly
         processorConfig = new ProcessorConfiguration(context);
-        final FileSystem fs = getFileSystem();
-        final Path dir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
-        if (!fs.exists(dir)) {
-            throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
-        }
 
         // forget the state of the queue in case HDFS contents changed while this processor was turned off
         queueLock.lock();
@@ -422,8 +417,16 @@ public class GetHDFS extends AbstractHadoopProcessor {
         if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
             try {
                 final FileSystem hdfs = getFileSystem();
-                // get listing
-                listing = selectFiles(hdfs, new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()), null);
+                final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
+
+                if (!hdfs.exists(directoryPath)) {
+                    context.yield();
+                    getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
+                } else {
+                    // get listing
+                    listing = selectFiles(hdfs, directoryPath, null);
+                }
+
                 lastPollTime.set(System.currentTimeMillis());
             } finally {
                 listingLock.unlock();
@@ -447,10 +450,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
             filesVisited = new HashSet<>();
         }
 
-        if (!hdfs.exists(dir)) {
-            throw new IOException("Selection directory " + dir.toString() + " doesn't appear to exist!");
-        }
-
         final Set<Path> files = new HashSet<>();
 
         FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(dir));

http://git-wip-us.apache.org/repos/asf/nifi/blob/dc67bd2f/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index 40666d9..d3837a8 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -143,6 +143,17 @@ public class GetHDFSTest {
     }
 
     @Test
+    public void testDirectoryDoesNotExist() {
+        GetHDFS proc = new TestableGetHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, "does/not/exist/${now():format('yyyyMMdd')}");
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.run();
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+        assertEquals(0, flowFiles.size());
+    }
+
+    @Test
     public void testAutomaticDecompression() throws IOException {
         GetHDFS proc = new TestableGetHDFS(kerberosProperties);
         TestRunner runner = TestRunners.newTestRunner(proc);