You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/10/02 08:09:44 UTC

[nifi] branch master updated: NIFI-6275 ListHDFS now ignores scheme and authority when uses "Full Path" filter mode Updated description for "Full Path" filter mode to state that it will ignore scheme and authority Added tests to TestListHDFS for listing an empty and nonexistent dirs Updated TestListHDFS' mock file system to track state properly when FileStatus instances are added, and updated listStatus to work properly with the underlying Map that contains FileStatus instances Updated ListHDFS' additional details to d [...]

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d74822  NIFI-6275 ListHDFS now ignores scheme and authority when uses "Full Path" filter mode Updated description for "Full Path" filter mode to state that it will ignore scheme and authority Added tests to TestListHDFS for listing an empty and nonexistent dirs Updated TestListHDFS' mock file system to track state properly when FileStatus instances are added, and updated listStatus to work properly with the underlying Map that contains FileStatus instances Updated ListHDFS' addi [...]
8d74822 is described below

commit 8d748223ff8f80c7a85fc38013ecf0b221adc2da
Author: Jeff Storck <jt...@gmail.com>
AuthorDate: Mon May 13 16:26:36 2019 -0400

    NIFI-6275 ListHDFS now ignores scheme and authority when uses "Full Path" filter mode
    Updated description for "Full Path" filter mode to state that it will ignore scheme and authority
    Added tests to TestListHDFS for listing an empty and nonexistent dirs
    Updated TestListHDFS' mock file system to track state properly when FileStatus instances are added, and updated listStatus to work properly with the underlying Map that contains FileStatus instances
    Updated ListHDFS' additional details to document "Full Path" filter mode ignoring scheme and authority, with an example
    Updated TestRunners, StandardProcessorTestRunner, MockProcessorInitializationContext to support passing in a logger.
    
    NIFI-6275 Updated the "Full Path" filter mode to check the full path of a file with and without its scheme and authority against the filter regex
    Added additional documentation for how ListHDFS handles scheme and authority when "Full Path" filter mode is used
    Added test case for "Full Path" filter mode with a regular expression that includes scheme and authority
    
    This closes #3483.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../util/MockProcessorInitializationContext.java   |   6 +-
 .../nifi/util/StandardProcessorTestRunner.java     |   8 +-
 .../java/org/apache/nifi/util/TestRunners.java     |  50 +++++
 .../apache/nifi/processors/hadoop/ListHDFS.java    |  35 ++-
 .../additionalDetails.html                         |  11 +-
 .../nifi/processors/hadoop/TestListHDFS.java       | 243 +++++++++++++++++++--
 6 files changed, 316 insertions(+), 37 deletions(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
index e44e731..6a26371 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
@@ -33,8 +33,12 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
     private final MockProcessContext context;
 
     public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context) {
+        this(processor, context, null);
+    }
+
+    public MockProcessorInitializationContext(final Processor processor, final MockProcessContext context, final MockComponentLog logger) {
         processorId = UUID.randomUUID().toString();
-        logger = new MockComponentLog(processorId, processor);
+        this.logger = logger == null ? new MockComponentLog(processorId, processor) : logger;
         this.context = context;
     }
 
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 89af696..d995f8e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -99,6 +99,10 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     StandardProcessorTestRunner(final Processor processor, String processorName) {
+        this(processor, processorName, null);
+    }
+
+    StandardProcessorTestRunner(final Processor processor, String processorName, MockComponentLog logger) {
         this.processor = processor;
         this.idGenerator = new AtomicLong(0L);
         this.sharedState = new SharedSessionState(processor, idGenerator);
@@ -108,9 +112,9 @@ public class StandardProcessorTestRunner implements TestRunner {
         this.variableRegistry = new MockVariableRegistry();
         this.context = new MockProcessContext(processor, processorName, processorStateManager, variableRegistry);
 
-        final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context);
+        final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context, logger);
         processor.initialize(mockInitContext);
-        logger =  mockInitContext.getLogger();
+        this.logger =  mockInitContext.getLogger();
 
         try {
             ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
index cff907c..9792473 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
@@ -32,6 +32,17 @@ public class TestRunners {
 
     /**
      * Returns a {@code TestRunner} for the given {@code Processor}.
+     * The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
+     * @param processor the {@code Processor} under test
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Processor processor, MockComponentLog logger) {
+        return newTestRunner(processor,processor.getClass().getName(), logger);
+    }
+
+    /**
+     * Returns a {@code TestRunner} for the given {@code Processor}.
      * The processor name available from {@code TestRunner.getProcessContext().getName()} will be the passed name.
      * @param processor the {@code Processor} under test
      * @param name the name to give the {@code Processor}
@@ -42,6 +53,18 @@ public class TestRunners {
     }
 
     /**
+     * Returns a {@code TestRunner} for the given {@code Processor}.
+     * The processor name available from {@code TestRunner.getProcessContext().getName()} will be the passed name.
+     * @param processor the {@code Processor} under test
+     * @param name the name to give the {@code Processor}
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Processor processor, String name, MockComponentLog logger) {
+        return new StandardProcessorTestRunner(processor, name, logger);
+    }
+
+    /**
      * Returns a {@code TestRunner} for the given {@code Processor} class.
      * The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
      * @param processorClass the {@code Processor} class
@@ -55,6 +78,17 @@ public class TestRunners {
      * Returns a {@code TestRunner} for the given {@code Processor} class.
      * The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
      * @param processorClass the {@code Processor} class
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Class<? extends Processor> processorClass, MockComponentLog logger) {
+        return newTestRunner(processorClass, processorClass.getName(), logger);
+    }
+
+    /**
+     * Returns a {@code TestRunner} for the given {@code Processor} class.
+     * The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
+     * @param processorClass the {@code Processor} class
      * @param name the name to give the {@code Processor}
      * @return a {@code TestRunner}
      */
@@ -67,4 +101,20 @@ public class TestRunners {
         }
     }
 
+    /**
+     * Returns a {@code TestRunner} for the given {@code Processor} class.
+     * The processor name available from {@code TestRunner.getProcessContext().getName()} will have the default name of {@code processor.getClass().getName()}
+     * @param processorClass the {@code Processor} class
+     * @param name the name to give the {@code Processor}
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Class<? extends Processor> processorClass, String name, MockComponentLog logger) {
+        try {
+            return newTestRunner(processorClass.newInstance(), name, logger);
+        } catch (final Exception e) {
+            System.err.println("Could not instantiate instance of class " + processorClass.getName() + " due to: " + e);
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 15ed4b1..31f582f 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -126,24 +127,25 @@ public class ListHDFS extends AbstractHadoopProcessor {
     private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
     static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
         "Directories and Files",
-        "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getName()
+        "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getDisplayName()
                 + " is set to true, only subdirectories with a matching name will be searched for files that match "
-                + "the regular expression defined in " + FILE_FILTER.getName() + ".");
+                + "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
     static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
         "Files Only",
-        "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getName()
+        "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getDisplayName()
                 + " is set to true, the entire subdirectory tree will be searched for files that match "
-                + "the regular expression defined in " + FILE_FILTER.getName() + ".");
+                + "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
     static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
         "Full Path",
-        "Filtering will be applied to the full path of files.  If " + RECURSE_SUBDIRS.getName()
-                + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
-                + "the file matches the regular expression defined in " + FILE_FILTER.getName() + ".");
+        "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+                + " against the full path of files with and without the scheme and authority.  If "
+                + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+                + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more information.");
 
     public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
         .name("file-filter-mode")
         .displayName("File Filter Mode")
-        .description("Determines how the regular expression in  " + FILE_FILTER.getName() + " will be used when retrieving listings.")
+        .description("Determines how the regular expression in  " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
         .required(true)
         .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
         .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
@@ -181,12 +183,21 @@ public class ListHDFS extends AbstractHadoopProcessor {
     static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
 
     static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+    private Pattern fileFilterRegexPattern;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
         super.init(context);
     }
 
+    @Override
+    protected void preProcessConfiguration(Configuration config, ProcessContext context) {
+        super.preProcessConfiguration(config, context);
+        // Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER regex can be compiled here rather than during onTrigger processing
+        fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+
+    }
+
     protected File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
     }
@@ -222,7 +233,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         if (minimumAge > maximumAge) {
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
-                    .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
+                    .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
 
         return problems;
@@ -526,14 +537,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
     }
 
     private PathFilter createPathFilter(final ProcessContext context) {
-        final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
         final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
         return path -> {
             final boolean accepted;
             if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
-                accepted = filePattern.matcher(path.toString()).matches();
+                accepted = fileFilterRegexPattern.matcher(path.toString()).matches()
+                        || fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
             } else {
-                accepted =  filePattern.matcher(path.getName()).matches();
+                accepted =  fileFilterRegexPattern.matcher(path.getName()).matches();
             }
             return accepted;
         };
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
index 2fd0deb..804208c 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
@@ -28,11 +28,16 @@
 There are three filter modes available for ListHDFS that determine how the regular expression in the <b><code>File Filter</code></b> property will be applied to listings in HDFS.
 <ul>
     <li><b><code>Directories and Files</code></b></li>
-Filtering will be applied to the names of directories and files.  If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
+    Filtering will be applied to the names of directories and files.  If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
     <li><b><code>Files Only</code></b></li>
-Filtering will only be applied to the names of files.  If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
+    Filtering will only be applied to the names of files.  If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
     <li><b><code>Full Path</code></b></li>
-Filtering will be applied to the full path of files.  If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.
+    Filtering will be applied to the full path of files.  If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.<br>
+    Regarding <code>scheme</code> and <code>authority</code>, if a given file has a full path of <code>hdfs://hdfscluster:8020/data/txt/1.txt</code>, the filter will evaluate the regular expression defined in <b><code>File Filter</code></b> against two cases, matching if either is true:<br>
+    <ul>
+        <li>the full path including the scheme (<code>hdfs</code>), authority (<code>hdfscluster:8020</code>), and the remaining path components (<code>/data/txt/1.txt</code>)</li>
+        <li>only the path components (<code>/data/txt/1.txt</code>)</li>
+    </ul>
 </ul>
 <p>
 <h2>Examples:</h2>
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 8134b4d..18c7e34 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -20,7 +20,15 @@ import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_
 import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
 import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -34,11 +42,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,6 +65,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
@@ -61,6 +73,7 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 public class TestListHDFS {
 
@@ -68,6 +81,7 @@ public class TestListHDFS {
     private ListHDFSWithMockedFileSystem proc;
     private NiFiProperties mockNiFiProperties;
     private KerberosProperties kerberosProperties;
+    private MockComponentLog mockLogger;
 
     @Before
     public void setup() throws InitializationException {
@@ -76,7 +90,8 @@ public class TestListHDFS {
         kerberosProperties = new KerberosProperties(null);
 
         proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
-        runner = TestRunners.newTestRunner(proc);
+        mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), proc));
+        runner = TestRunners.newTestRunner(proc, mockLogger);
 
         runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
         runner.setProperty(ListHDFS.DIRECTORY, "/test");
@@ -279,25 +294,92 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testRecursiveWithCustomFilterFullPath() throws InterruptedException, IOException {
+    public void testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() throws InterruptedException, IOException {
         // set custom regex filter and filter mode
         runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
         runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
 
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
 
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+        // first iteration will not pick up files because it has to instead check timestamps.
+        // We must then wait long enough to ensure that the listing can be performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i = 0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("1.out")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else if (filename.equals("1.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
+    public void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() throws InterruptedException, IOException {
+        // set custom regex filter and filter mode
+        runner.setProperty(ListHDFS.FILE_FILTER, "hdfs://hdfscluster:8020(/.*/)*anotherDir/1\\..*");
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
 
         // first iteration will not pick up files because it has to instead check timestamps.
         // We must then wait long enough to ensure that the listing can be performed safely and
@@ -532,6 +614,115 @@ public class TestListHDFS {
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
     }
 
+    @Test
+    public void testListingEmptyDir() throws InterruptedException, IOException {
+        runner.setProperty(ListHDFS.DIRECTORY, "/test/emptyDir");
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
+
+        // first iteration will not pick up files because it has to instead check timestamps.
+        // We must then wait long enough to ensure that the listing can be performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        // verify that no messages were logged at the error level
+        verify(mockLogger, never()).error(anyString());
+        final ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+        verify(mockLogger, atLeast(0)).error(anyString(), throwableArgumentCaptor.capture());
+        // if error.(message, throwable) was called, ignore JobConf CNFEs since mapreduce libs are not included as dependencies
+        assertTrue(throwableArgumentCaptor.getAllValues().stream().flatMap(Stream::of)
+                // check that there are no throwables that are not of JobConf CNFE exceptions
+                .noneMatch(throwable -> !(throwable instanceof ClassNotFoundException && throwable.getMessage().contains("JobConf"))));
+        verify(mockLogger, never()).error(anyString(), any(Object[].class));
+        verify(mockLogger, never()).error(anyString(), any(Object[].class), any(Throwable.class));
+
+        // assert that no files were listed
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
+    }
+
+    @Test
+    public void testListingNonExistingDir() throws InterruptedException, IOException {
+        String nonExistingPath = "/test/nonExistingDir";
+        runner.setProperty(ListHDFS.DIRECTORY, nonExistingPath);
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
+
+        // first iteration will not pick up files because it has to instead check timestamps.
+        // We must then wait long enough to ensure that the listing can be performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        final ArgumentCaptor<Object[]> loggingArgsCaptor = ArgumentCaptor.forClass(Object[].class);
+        verify(mockLogger, atLeastOnce()).error(anyString(), loggingArgsCaptor.capture());
+        // assert that FNFE exceptions were logged for the Directory property's value.
+        assertTrue(loggingArgsCaptor.getAllValues().stream().flatMap(Stream::of)
+                .anyMatch(o -> o instanceof FileNotFoundException && ((FileNotFoundException)o).getMessage().contains(nonExistingPath)));
+
+        // assert that no files were listed
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
+    }
 
     private FsPermission create777() {
         return new FsPermission((short) 0777);
@@ -578,6 +769,11 @@ public class TestListHDFS {
             }
 
             children.add(child);
+
+            // if the child is directory and a key for it does not exist, create it with an empty set of children
+            if (child.isDirectory() && !fileStatuses.containsKey(child.getPath())) {
+                fileStatuses.put(child.getPath(), new HashSet<>());
+            }
         }
 
         @Override
@@ -625,12 +821,21 @@ public class TestListHDFS {
 
         @Override
         public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
-            final Set<FileStatus> statuses = fileStatuses.get(f);
-            if (statuses == null) {
-                return new FileStatus[0];
-            }
-
-            return statuses.toArray(new FileStatus[statuses.size()]);
+            return fileStatuses.keySet().stream()
+                    // find the key in fileStatuses that matches the given Path f
+                    .filter(pathKey -> f.isAbsoluteAndSchemeAuthorityNull()
+                            // f is an absolute path with no scheme and no authority, compare with the keys of fileStatuses without their scheme and authority
+                            ? Path.getPathWithoutSchemeAndAuthority(pathKey).equals(Path.getPathWithoutSchemeAndAuthority(f)) :
+                            // f is absolute, but contains a scheme or authority, compare directly to the keys of fileStatuses
+                            // if f is not absolute, false will be returned;
+                            f.isAbsolute() && pathKey.equals(f))
+                    // get the set of FileStatus objects for the filtered paths in the stream
+                    .map(fileStatuses::get)
+                    // return the first set of FileStatus objects in the stream; there should only be one, since fileStatuses is a Map
+                    .findFirst()
+
+                    // if no set of FileStatus objects was found, throw a FNFE
+                    .orElseThrow(() -> new FileNotFoundException(String.format("%s instance does not contain an key for %s", this.getClass().getSimpleName(), f))).toArray(new FileStatus[0]);
         }
 
         @Override