You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/08/08 16:37:37 UTC
nifi git commit: NIFI-4434 Fixed recursive listing with a custom
regex filter. Filter modes are now supported to perform listings based on
directory and file names, file-names only, and full path.
Repository: nifi
Updated Branches:
refs/heads/master 9a79c94f8 -> 451084e11
NIFI-4434 Fixed recursive listing with a custom regex filter.
Filter modes are now supported to perform listings based on directory and file names, file-names only, and full path.
This closes #2937
Signed-off-by: zenfenan <ze...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/451084e1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/451084e1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/451084e1
Branch: refs/heads/master
Commit: 451084e11f8c3dc2a0c68ffb739dff58de479cff
Parents: 9a79c94
Author: Jeff Storck <jt...@gmail.com>
Authored: Wed Aug 1 13:13:40 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Wed Aug 8 22:06:50 2018 +0530
----------------------------------------------------------------------
.../apache/nifi/processors/hadoop/ListHDFS.java | 83 +++++++++--
.../additionalDetails.html | 104 ++++++++++++++
.../nifi/processors/hadoop/TestListHDFS.java | 139 ++++++++++++++++++-
3 files changed, 316 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/451084e1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 612247d..a147730 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -119,6 +120,35 @@ public class ListHDFS extends AbstractHadoopProcessor {
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
+ private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
+ private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
+ private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
+ static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
+ "Directories and Files",
+ "Filtering will be applied to the names of directories and files. If " + RECURSE_SUBDIRS.getName()
+ + " is set to true, only subdirectories with a matching name will be searched for files that match "
+ + "the regular expression defined in " + FILE_FILTER.getName() + ".");
+ static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
+ "Files Only",
+ "Filtering will only be applied to the names of files. If " + RECURSE_SUBDIRS.getName()
+ + " is set to true, the entire subdirectory tree will be searched for files that match "
+ + "the regular expression defined in " + FILE_FILTER.getName() + ".");
+ static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
+ "Full Path",
+ "Filtering will be applied to the full path of files. If " + RECURSE_SUBDIRS.getName()
+ + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+ + "the file matches the regular expression defined in " + FILE_FILTER.getName() + ".");
+
+ public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
+ .name("file-filter-mode")
+ .displayName("File Filter Mode")
+ .description("Determines how the regular expression in " + FILE_FILTER.getName() + " will be used when retrieving listings.")
+ .required(true)
+ .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
+ .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
.name("minimum-file-age")
.displayName("Minimum File Age")
@@ -167,6 +197,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
props.add(DIRECTORY);
props.add(RECURSE_SUBDIRS);
props.add(FILE_FILTER);
+ props.add(FILE_FILTER_MODE);
props.add(MIN_AGE);
props.add(MAX_AGE);
return props;
@@ -340,11 +371,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+ String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
final Set<FileStatus> statuses;
try {
final Path rootPath = new Path(directory);
- statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context));
+ statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
@@ -391,29 +423,58 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
}
- private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) throws IOException, InterruptedException {
+ private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
final Set<FileStatus> statusSet = new HashSet<>();
getLogger().debug("Fetching listing for {}", new Object[] {path});
- final FileStatus[] statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
+ final FileStatus[] statuses;
+ if (isPostListingFilterNeeded(filterMode)) {
+ // For this filter mode, the filter is not passed to listStatus, so that directory names will not be
+ // filtered out when the listing is recursive.
+ statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
+ } else {
+ statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
+ }
for ( final FileStatus status : statuses ) {
if ( status.isDirectory() ) {
if ( recursive ) {
try {
- statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter));
+ statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
} catch (final IOException ioe) {
getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
}
}
} else {
- statusSet.add(status);
+ if (isPostListingFilterNeeded(filterMode)) {
+ // Filtering explicitly performed here, since it was not able to be done when calling listStatus.
+ if (filter.accept(status.getPath())) {
+ statusSet.add(status);
+ }
+ } else {
+ statusSet.add(status);
+ }
}
}
return statusSet;
}
+ /**
+ * Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
+ * given filter mode.
+ * Filter modes that need to be able to search directories regardless of the given filter should return true.
+ * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
+ * without a filter so that all directories can be traversed when filtering with these modes.
+ * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
+ * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
+ * @param filterMode the value of one of the defined AllowableValues representing filter modes
+ * @return true if results need to be filtered, false otherwise
+ */
+ private boolean isPostListingFilterNeeded(String filterMode) {
+ return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
+ }
+
private String getAbsolutePath(final Path path) {
final Path parent = path.getParent();
final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
@@ -462,11 +523,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
private PathFilter createPathFilter(final ProcessContext context) {
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
- return new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return filePattern.matcher(path.getName()).matches();
+ final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+ return path -> {
+ final boolean accepted;
+ if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
+ accepted = filePattern.matcher(path.toString()).matches();
+ } else {
+ accepted = filePattern.matcher(path.getName()).matches();
}
+ return accepted;
};
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/451084e1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
new file mode 100644
index 0000000..2fd0deb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
@@ -0,0 +1,104 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<head>
+ <meta charset="utf-8"/>
+ <title>PutHDFS</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h1>ListHDFS Filter Modes</h1>
+<p>
+There are three filter modes available for ListHDFS that determine how the regular expression in the <b><code>File Filter</code></b> property will be applied to listings in HDFS.
+<ul>
+ <li><b><code>Directories and Files</code></b></li>
+Filtering will be applied to the names of directories and files. If <b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories with a matching name will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
+ <li><b><code>Files Only</code></b></li>
+Filtering will only be applied to the names of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files that match the regular expression defined in <b><code>File Filter</code></b>.
+ <li><b><code>Full Path</code></b></li>
+Filtering will be applied to the full path of files. If <b><code>Recurse Subdirectories</code></b> is set to true, the entire subdirectory tree will be searched for files in which the full path of the file matches the regular expression defined in <b><code>File Filter</code></b>.
+</ul>
+<p>
+<h2>Examples:</h2>
+For the given examples, the following directory structure is used:
+<br>
+<br>
+ data<br>
+ ├── readme.txt<br>
+ ├── bin<br>
+ │ ├── readme.txt<br>
+ │ ├── 1.bin<br>
+ │ ├── 2.bin<br>
+ │ └── 3.bin<br>
+ ├── csv<br>
+ │ ├── readme.txt<br>
+ │ ├── 1.csv<br>
+ │ ├── 2.csv<br>
+ │ └── 3.csv<br>
+ └── txt<br>
+ ├── readme.txt<br>
+ ├── 1.txt<br>
+ ├── 2.txt<br>
+ └── 3.txt<br>
+ <br><br>
+<h3><b>Directories and Files</b></h3>
+This mode is useful when the listing should match the names of directories and files with the regular expression defined in <b><code>File Filter</code></b>. When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in subdirectories with names that match the regular expression defined in <b><code>File Filter</code></b>.
+<br>
+<br>
+ListHDFS configuration:
+<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>.*txt.*</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Directories and Files</code></td></tr></table>
+<p>ListHDFS results:
+<ul>
+ <li>/data/readme.txt</li>
+ <li>/data/txt/readme.txt</li>
+ <li>/data/txt/1.txt</li>
+ <li>/data/txt/2.txt</li>
+ <li>/data/txt/3.txt</li>
+</ul>
+<h3><b>Files Only</b></h3>
+This mode is useful when the listing should match only the names of files with the regular expression defined in <b><code>File Filter</code></b>. Directory names will not be matched against the regular expression defined in <b><code>File Filter</code></b>. When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the <b><code>Directory</code></b> property.
+<br>
+<br>
+ListHDFS configuration:
+<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>[^\.].*\.txt</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Files Only</code></td></tr></table>
+<p>ListHDFS results:
+<ul>
+ <li>/data/readme.txt</li>
+ <li>/data/bin/readme.txt</li>
+ <li>/data/csv/readme.txt</li>
+ <li>/data/txt/readme.txt</li>
+ <li>/data/txt/1.txt</li>
+ <li>/data/txt/2.txt</li>
+ <li>/data/txt/3.txt</li>
+</ul>
+<h3><b>Full Path</b></h3>
+This mode is useful when the listing should match the entire path of a file with the regular expression defined in <b><code>File Filter</code></b>. When <b><code>Recurse Subdirectories</code></b> is true, this mode allows the user to filter for files in the entire subdirectory tree of the directory specified in the <b><code>Directory</code></b> property while allowing filtering based on the full path of each file.
+<br>
+<br>
+ListHDFS configuration:
+<table><tr><th><b><code>Property</code></b></th><th><b><code>Value</code></b></th></tr><tr><td><b><code>Directory</code></b></td><td><code>/data</code></td></tr><tr><td><b><code>Recurse Subdirectories</code></b></td><td>true</td><tr><td><b><code>File Filter</code></b></td><td><code>(/.*/)*csv/.*</code></td></tr><tr><td><code><b>Filter Mode</b></code></td><td><code>Full Path</code></td></tr></table>
+<p>ListHDFS results:
+<ul>
+ <li>/data/csv/readme.txt</li>
+ <li>/data/csv/1.csv</li>
+ <li>/data/csv/2.csv</li>
+ <li>/data/csv/3.csv</li>
+</ul>
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/451084e1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index bfa9851..b349962 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.processors.hadoop;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -162,7 +165,8 @@ public class TestListHDFS {
@Test
- public void testRecursive() throws InterruptedException {
+ public void testRecursiveWithDefaultFilterAndFilterMode() throws InterruptedException {
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
@@ -193,6 +197,139 @@ public class TestListHDFS {
}
@Test
+ public void testRecursiveWithCustomFilterDirectoriesAndFiles() throws InterruptedException, IOException {
+ // set custom regex filter and filter mode
+ runner.setProperty(ListHDFS.FILE_FILTER, ".*txt.*");
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_DIRECTORIES_AND_FILES_VALUE.getValue());
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.out")));
+ proc.fileSystem.addFileStatus(new Path("/test/txtDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/txtDir/3.txt")));
+
+ // first iteration will not pick up files because it has to instead check timestamps.
+ // We must then wait long enough to ensure that the listing can be performed safely and
+ // run the Processor again.
+ runner.run();
+ Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+ for (int i = 0; i < 2; i++) {
+ final MockFlowFile ff = flowFiles.get(i);
+ final String filename = ff.getAttribute("filename");
+
+ if (filename.equals("testFile.txt")) {
+ ff.assertAttributeEquals("path", "/test");
+ } else if (filename.equals("3.txt")) {
+ ff.assertAttributeEquals("path", "/test/txtDir");
+ } else {
+ Assert.fail("filename was " + filename);
+ }
+ }
+ }
+
+ @Test
+ public void testRecursiveWithCustomFilterFilesOnly() throws InterruptedException, IOException {
+ // set custom regex filter and filter mode
+ runner.setProperty(ListHDFS.FILE_FILTER, "[^\\.].*\\.txt");
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FILES_ONLY_VALUE.getValue());
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/.partfile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/.txt")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+
+ // first iteration will not pick up files because it has to instead check timestamps.
+ // We must then wait long enough to ensure that the listing can be performed safely and
+ // run the Processor again.
+ runner.run();
+ Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+ for (int i = 0; i < 2; i++) {
+ final MockFlowFile ff = flowFiles.get(i);
+ final String filename = ff.getAttribute("filename");
+
+ if (filename.equals("testFile.txt")) {
+ ff.assertAttributeEquals("path", "/test");
+ } else if (filename.equals("1.txt")) {
+ ff.assertAttributeEquals("path", "/test/testDir");
+ } else if (filename.equals("2.txt")) {
+ ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+ } else {
+ Assert.fail("filename was " + filename);
+ }
+ }
+ }
+
+ @Test
+ public void testRecursiveWithCustomFilterFullPath() throws InterruptedException, IOException {
+ // set custom regex filter and filter mode
+ runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
+ runner.setProperty(ListHDFS.FILE_FILTER_MODE, FILTER_FULL_PATH_VALUE.getValue());
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.out")));
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/1.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.out")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/1.txt")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.out")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/anotherDir/2.txt")));
+
+ proc.fileSystem.addFileStatus(new Path("/test/testDir"), new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir")));
+ proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir/someDir/1.out")));
+
+ // first iteration will not pick up files because it has to instead check timestamps.
+ // We must then wait long enough to ensure that the listing can be performed safely and
+ // run the Processor again.
+ runner.run();
+ Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+ for (int i = 0; i < 2; i++) {
+ final MockFlowFile ff = flowFiles.get(i);
+ final String filename = ff.getAttribute("filename");
+
+ if (filename.equals("1.out")) {
+ ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+ } else if (filename.equals("1.txt")) {
+ ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+ } else {
+ Assert.fail("filename was " + filename);
+ }
+ }
+ }
+
+ @Test
public void testNotRecursive() throws InterruptedException {
runner.setProperty(ListHDFS.RECURSE_SUBDIRS, "false");
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));