You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/08/08 16:37:37 UTC

nifi git commit: NIFI-4434 Fixed recursive listing with a custom regex filter. Filter modes are now supported to perform listings based on directory and file names, file-names only, and full path.

Repository: nifi
Updated Branches:
  refs/heads/master 9a79c94f8 -> 451084e11


NIFI-4434 Fixed recursive listing with a custom regex filter.
Filter modes are now supported to perform listings based on directory and file names, file-names only, and full path.

This closes #2937

Signed-off-by: zenfenan <ze...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/451084e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/451084e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/451084e1

Branch: refs/heads/master
Commit: 451084e11f8c3dc2a0c68ffb739dff58de479cff
Parents: 9a79c94
Author: Jeff Storck <jt...@gmail.com>
Authored: Wed Aug 1 13:13:40 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Wed Aug 8 22:06:50 2018 +0530

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/ListHDFS.java |  83 +++++++++--
 .../additionalDetails.html                      | 104 ++++++++++++++
 .../nifi/processors/hadoop/TestListHDFS.java    | 139 ++++++++++++++++++-
 3 files changed, 316 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/451084e1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 612247d..a147730 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -119,6 +120,35 @@ public class ListHDFS extends AbstractHadoopProcessor {
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
         .build();
 
+    private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
+    private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
+    private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
+    static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
+        "Directories and Files",
+        "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getName()
+                + " is set to true, only subdirectories with a matching name will be searched for files that match "
+                + "the regular expression defined in " + FILE_FILTER.getName() + ".");
+    static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
+        "Files Only",
+        "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getName()
+                + " is set to true, the entire subdirectory tree will be searched for files that match "
+                + "the regular expression defined in " + FILE_FILTER.getName() + ".");
+    static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
+        "Full Path",
+        "Filtering will be applied to the full path of files.  If " + RECURSE_SUBDIRS.getName()
+                + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+                + "the file matches the regular expression defined in " + FILE_FILTER.getName() + ".");
+
+    public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
+        .name("file-filter-mode")
+        .displayName("File Filter Mode")
+        .description("Determines how the regular expression in  " + FILE_FILTER.getName() + " will be used when retrieving listings.")
+        .required(true)
+        .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
+        .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
+
     public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
         .name("minimum-file-age")
         .displayName("Minimum File Age")
@@ -167,6 +197,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         props.add(DIRECTORY);
         props.add(RECURSE_SUBDIRS);
         props.add(FILE_FILTER);
+        props.add(FILE_FILTER_MODE);
         props.add(MIN_AGE);
         props.add(MAX_AGE);
         return props;
@@ -340,11 +371,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
         final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
 
         final Set<FileStatus> statuses;
         try {
             final Path rootPath = new Path(directory);
-            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context));
+            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
             getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
         } catch (final IOException | IllegalArgumentException e) {
             getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
@@ -391,29 +423,58 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException {
+    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
         final Set<FileStatus> statusSet = new HashSet<>();
 
         getLogger().debug("Fetching listing for {}", new Object[] {path});
-        final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
+        final FileStatus[] statuses;
+        if (isPostListingFilterNeeded(filterMode)) {
+            // For this filter mode, the filter is not passed to listStatus, so that directory names will not be
+            // filtered out when the listing is recursive.
+            statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
+        } else {
+            statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
+        }
 
         for ( final FileStatus status : statuses ) {
             if ( status.isDirectory() ) {
                 if ( recursive ) {
                     try {
-                        statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter));
+                        statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
                     } catch (final IOException ioe) {
                         getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
                     }
                 }
             } else {
-                statusSet.add(status);
+                if (isPostListingFilterNeeded(filterMode)) {
+                    // Filtering explicitly performed here, since it was not able to be done when calling listStatus.
+                    if (filter.accept(status.getPath())) {
+                        statusSet.add(status);
+                    }
+                } else {
+                    statusSet.add(status);
+                }
             }
         }
 
         return statusSet;
     }
 
+    /**
+     * Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
+     * given filter mode.
+     * Filter modes that need to be able to search directories regardless of the given filter should return true.
+     * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
+     * without a filter so that all directories can be traversed when filtering with these modes.
+     * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
+     * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
+     * @param filterMode the value of one of the defined AllowableValues representing filter modes
+     * @return true if results need to be filtered, false otherwise
+     */
+    private boolean isPostListingFilterNeeded(String filterMode) {
+        return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
+    }
+
     private String getAbsolutePath(final Path path) {
         final Path parent = path.getParent();
         final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
@@ -462,11 +523,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
     private PathFilter createPathFilter(final ProcessContext context) {
         final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-        return new PathFilter() {
-            @Override
-            public boolean accept(Path path) {
-                return filePattern.matcher(path.getName()).matches();
+        final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        return path -> {
+            final boolean accepted;
+            if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
+                accepted = filePattern.matcher(path.toString()).matches();
+            } else {
+                accepted =  filePattern.matcher(path.getName()).matches();
             }
+            return accepted;
         };
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451084e1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
new file mode 100644
index 0000000..2fd0deb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
@@ -0,0 +1,104 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>PutHDFS</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h1>ListHDFS Filter Modes</h1>
+<p>
+There are three filter modes available for ListHDFS that determine how the regular expression in the <b><code>File Filter</code></b> property will be applied to listings in HDFS.
+<ul>
+    <li><b><code>Directories and Files</code></b></li>
+Filtering will be applied to the names of directories and files.  If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
+    <li><b><code>Files Only</code></b></li>
+Filtering will only be applied to the names of files.  If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
+    <li><b><code>Full Path</code></b></li>
+Filtering will be applied to the full path of files.  If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.
+</ul>
+<p>
+<h2>Examples:</h2>
+For the given examples, the following directory structure is used:
+<br>
+<br>
+    data<br>
+    ├── readme.txt<br>
+    ├── bin<br>
+    │   ├── readme.txt<br>
+    │   ├── 1.bin<br>
+    │   ├── 2.bin<br>
+    │   └── 3.bin<br>
+    ├── csv<br>
+    │   ├── readme.txt<br>
+    │   ├── 1.csv<br>
+    │   ├── 2.csv<br>
+    │   └── 3.csv<br>
+    └── txt<br>
+    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ├── readme.txt<br>
+    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ├── 1.txt<br>
+    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ├── 2.txt<br>
+    &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; └── 3.txt<br>
+    <br><br>
+<h3><b>Directories and Files</b></h3>
+This mode is useful when the listing should match the names of directories and files with the regular expression defined in <b><code>File Filter</code></b>.  When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in subdirectories with names that match the regular expression defined in <b><code>File Filter</code></b>.
+<br>
+<br>
+ListHDFS configuration:
+<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>.*txt.*</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Directories and Files</code></td></tr></table>
+<p>ListHDFS results:
+<ul>
+    <li>/data/readme.txt</li>
+    <li>/data/txt/readme.txt</li>
+    <li>/data/txt/1.txt</li>
+    <li>/data/txt/2.txt</li>
+    <li>/data/txt/3.txt</li>
+</ul>
+<h3><b>Files Only</b></h3>
+This mode is useful when the listing should match only the names of files with the regular expression defined in <b><code>File Filter</code></b>.  Directory names will not be matched against the regular expression defined in <b><code>File Filter</code></b>.  When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the <b><code>Directory</code></b> property.
+<br>
+<br>
+ListHDFS configuration:
+<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>[^\.].*\.txt</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Files Only</code></td></tr></table>
+<p>ListHDFS results:
+<ul>
+    <li>/data/readme.txt</li>
+    <li>/data/bin/readme.txt</li>
+    <li>/data/csv/readme.txt</li>
+    <li>/data/txt/readme.txt</li>
+    <li>/data/txt/1.txt</li>
+    <li>/data/txt/2.txt</li>
+    <li>/data/txt/3.txt</li>
+</ul>
+<h3><b>Full Path</b></h3>
+This mode is useful when the listing should match the entire path of a file with the regular expression defined in <b><code>File Filter</code></b>.  When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the <b><code>Directory</code></b> property while allowing filtering based on the full path of each file.
+<br>
+<br>
+ListHDFS configuration:
+<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>(/.*/)*csv/.*</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Full Path</code></td></tr></table>
+<p>ListHDFS results:
+<ul>
+    <li>/data/csv/readme.txt</li>
+    <li>/data/csv/1.csv</li>
+    <li>/data/csv/2.csv</li>
+    <li>/data/csv/3.csv</li>
+</ul>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/451084e1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index bfa9851..b349962 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -162,7 +165,8 @@ public class TestListHDFS {
 
 
     @Test
-    public void testRecursive() throws InterruptedException {
+    public void testRecursiveWithDefaultFilterAndFilterMode() throws InterruptedException {
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.testFile.txt")));
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
 
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
@@ -193,6 +197,139 @@ public class TestListHDFS {
     }
 
     @Test
+    public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws InterruptedException, IOException {
+        // set custom regex filter and filter mode
+        runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*");
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES_VALUE.getValue());
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.out")));
+        proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.txt")));
+
+        // first iteration will not pick up files because it has to instead check timestamps.
+        // We must then wait long enough to ensure that the listing can be performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i = 0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("testFile.txt")) {
+                ff.assertAttributeEquals("path", "/test");
+            } else if (filename.equals("3.txt")) {
+                ff.assertAttributeEquals("path", "/test/txtDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
+    public void testRecursiveWithCustomFilterFilesOnly() throws InterruptedException, IOException {
+        // set custom regex filter and filter mode
+        runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FILES_ONLY_VALUE.getValue());
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.partfile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+
+        // first iteration will not pick up files because it has to instead check timestamps.
+        // We must then wait long enough to ensure that the listing can be performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i = 0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("testFile.txt")) {
+                ff.assertAttributeEquals("path", "/test");
+            } else if (filename.equals("1.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir");
+            } else if (filename.equals("2.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
+    public void testRecursiveWithCustomFilterFullPath() throws InterruptedException, IOException {
+        // set custom regex filter and filter mode
+        runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir/1.out")));
+
+        // first iteration will not pick up files because it has to instead check timestamps.
+        // We must then wait long enough to ensure that the listing can be performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i = 0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("1.out")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else if (filename.equals("1.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
     public void testNotRecursive() throws InterruptedException {
         runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));