You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/02/24 14:13:36 UTC
[nifi] branch main updated: NIFI-9716, NIFI-9577: Addressed issue in the PathFilter for GetFile / ListFile. For any file that is found in the Input Directory directly, it was previously being listed/fetched even if it didn't match the PathFilter. Additionally, updated the code to create a new File FIlter for every invocation of onTrigger. This was necessary for NIFI-9577 because the directory to monitor supports Expression Language and as a result may change from invocation to invocation, if using a function such as [...]
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 39483b9 NIFI-9716, NIFI-9577: Addressed issue in the PathFilter for GetFile / ListFile. For any file that is found in the Input Directory directly, it was previously being listed/fetched even if it didn't match the PathFilter. Additionally, updated the code to create a new File FIlter for every invocation of onTrigger. This was necessary for NIFI-9577 because the directory to monitor supports Expression Language and as a result may change from invocation to invocation, if using [...]
39483b9 is described below
commit 39483b9c121b11abbc00f12232075ef4328a17ee
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Feb 22 14:23:53 2022 -0500
NIFI-9716, NIFI-9577: Addressed issue in the PathFilter for GetFile / ListFile. For any file that is found in the Input Directory directly, it was previously being listed/fetched even if it didn't match the PathFilter. Additionally, updated the code to create a new File FIlter for every invocation of onTrigger. This was necessary for NIFI-9577 because the directory to monitor supports Expression Language and as a result may change from invocation to invocation, if using a function suc [...]
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #5788.
---
.../apache/nifi/processors/standard/GetFile.java | 22 ++++++++--------
.../apache/nifi/processors/standard/ListFile.java | 24 ++++++++----------
.../nifi/processors/standard/TestListFile.java | 29 ++++++++++++++++++++--
3 files changed, 47 insertions(+), 28 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
index 318a635..3c37be8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
@@ -48,7 +48,6 @@ import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileOwnerAttributeView;
@@ -70,7 +69,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -205,7 +203,6 @@ public class GetFile extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
- private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
private final BlockingQueue<File> fileQueue = new LinkedBlockingQueue<>();
private final Set<File> inProcess = new HashSet<>(); // guarded by queueLock
@@ -250,18 +247,16 @@ public class GetFile extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
- fileFilterRef.set(createFileFilter(context));
fileQueue.clear();
}
- private FileFilter createFileFilter(final ProcessContext context) {
+ private FileFilter createFileFilter(final ProcessContext context, final Path inputDirectory) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
- final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
@@ -287,11 +282,13 @@ public class GetFile extends AbstractProcessor {
return false;
}
if (pathPattern != null) {
- Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
- if (reldir != null && !reldir.toString().isEmpty()) {
- if (!pathPattern.matcher(reldir.toString()).matches()) {
- return false;
- }
+ final Path relativePath = inputDirectory.relativize(file.toPath()).getParent();
+ if (relativePath == null || relativePath.toString().isEmpty()) {
+ return false;
+ }
+
+ if (!pathPattern.matcher(relativePath.toString()).matches()) {
+ return false;
}
}
//Verify that we have at least read permissions on the file we're considering grabbing
@@ -378,12 +375,13 @@ public class GetFile extends AbstractProcessor {
final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final ComponentLog logger = getLogger();
+ final FileFilter fileFilter = createFileFilter(context, directory.toPath());
if (fileQueue.size() < 100) {
final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) {
try {
- final Set<File> listing = performListing(directory, fileFilterRef.get(), context.getProperty(RECURSE).asBoolean().booleanValue());
+ final Set<File> listing = performListing(directory, fileFilter, context.getProperty(RECURSE).asBoolean());
queueLock.lock();
try {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index c0af210..9cae320 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -56,7 +56,6 @@ import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileOwnerAttributeView;
@@ -81,7 +80,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -275,7 +273,6 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
private volatile boolean includeFileAttributes;
private volatile PerformanceTracker performanceTracker;
private volatile long performanceLoggingTimestamp = System.currentTimeMillis();
- private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<>();
public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
@@ -350,7 +347,6 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
} else {
performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
}
- fileFilterRef.set(createFileFilter(context, performanceTracker, true));
final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats);
@@ -520,13 +516,14 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final BiPredicate<Path, BasicFileAttributes> fileFilter;
final PerformanceTracker performanceTracker;
if (listingMode == ListingMode.EXECUTION) {
- fileFilter = fileFilterRef.get();
performanceTracker = this.performanceTracker;
+ fileFilter = createFileFilter(context, performanceTracker, applyFilters, basePath);
} else {
final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
- fileFilter = createFileFilter(context, performanceTracker, applyFilters);
+ fileFilter = createFileFilter(context, performanceTracker, applyFilters, basePath);
}
+
int maxDepth = recurse ? Integer.MAX_VALUE : 1;
final BiPredicate<Path, BasicFileAttributes> matcher = new BiPredicate<Path, BasicFileAttributes>() {
@@ -664,7 +661,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
}
private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context, final PerformanceTracker performanceTracker,
- final boolean applyFilters) {
+ final boolean applyFilters, final Path basePath) {
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@@ -672,13 +669,10 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
final String fileFilter = context.getProperty(FILE_FILTER).getValue();
final Pattern filePattern = Pattern.compile(fileFilter);
- final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
- final Path basePath = Paths.get(indir);
-
return (path, attributes) -> {
if (!applyFilters) {
return true;
@@ -706,10 +700,12 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
final File file = path.toFile();
if (pathPattern != null) {
- if (relativePath != null && !relativePath.toString().isEmpty()) {
- if (!pathPattern.matcher(relativePath.toString()).matches()) {
- return false;
- }
+ if (relativePath == null || relativePath.toString().isEmpty()) {
+ return false;
+ }
+
+ if (!pathPattern.matcher(relativePath.toString()).matches()) {
+ return false;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 7c3caa2..88a7189 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -285,6 +285,31 @@ public class TestListFile {
}
@Test
+ public void testPathFilterOnlyPicksUpMatchingFiles() throws IOException {
+ final File aaa = new File(TESTDIR, "aaa");
+ assertTrue(aaa.mkdirs() || aaa.exists());
+
+ final File bbb = new File(TESTDIR, "bbb");
+ assertTrue(bbb.mkdirs() || bbb.exists());
+
+ final File file1 = new File(aaa, "1.txt");
+ final File file2 = new File(bbb, "2.txt");
+ final File file3 = new File(TESTDIR, "3.txt");
+
+ final long tenSecondsAgo = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10L);
+ for (final File file : Arrays.asList(file1, file2, file3)) {
+ assertTrue(file.createNewFile() || file.exists());
+ assertTrue(file.setLastModified(tenSecondsAgo));
+ }
+
+ runner.setProperty(ListFile.DIRECTORY, TESTDIR);
+ runner.setProperty(ListFile.PATH_FILTER, ".+");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
+ }
+
+ @Test
public void testFilterAge() throws Exception {
final File file1 = new File(TESTDIR + "/age1.txt");
@@ -651,7 +676,7 @@ public class TestListFile {
assertEquals(4, successFiles1.size());
// filter path on pattern subdir1
- runner.setProperty(ListFile.PATH_FILTER, "subdir1");
+ runner.setProperty(ListFile.PATH_FILTER, "subdir1.*");
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 3 match the filter.");
runNext();
@@ -660,7 +685,7 @@ public class TestListFile {
assertEquals(3, successFiles2.size());
// filter path on pattern subdir2
- runner.setProperty(ListFile.PATH_FILTER, "subdir2");
+ runner.setProperty(ListFile.PATH_FILTER, ".*/subdir2");
runner.setProperty(ListFile.RECURSE, "true");
assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects. Of those, 1 matches the filter.");
runNext();