You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/02/24 14:13:36 UTC

[nifi] branch main updated: NIFI-9716, NIFI-9577: Addressed issue in the PathFilter for GetFile / ListFile. For any file that is found in the Input Directory directly, it was previously being listed/fetched even if it didn't match the PathFilter. Additionally, updated the code to create a new File FIlter for every invocation of onTrigger. This was necessary for NIFI-9577 because the directory to monitor supports Expression Language and as a result may change from invocation to invocation, if using a function such as [...]

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 39483b9  NIFI-9716, NIFI-9577: Addressed issue in the PathFilter for GetFile / ListFile. For any file that is found in the Input Directory directly, it was previously being listed/fetched even if it didn't match the PathFilter. Additionally, updated the code to create a new File FIlter for every invocation of onTrigger. This was necessary for NIFI-9577 because the directory to monitor supports Expression Language and as a result may change from invocation to invocation, if using  [...]
39483b9 is described below

commit 39483b9c121b11abbc00f12232075ef4328a17ee
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Feb 22 14:23:53 2022 -0500

    NIFI-9716, NIFI-9577: Addressed issue in the PathFilter for GetFile / ListFile. For any file that is found in the Input Directory directly, it was previously being listed/fetched even if it didn't match the PathFilter. Additionally, updated the code to create a new File FIlter for every invocation of onTrigger. This was necessary for NIFI-9577 because the directory to monitor supports Expression Language and as a result may change from invocation to invocation, if using a function suc [...]
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5788.
---
 .../apache/nifi/processors/standard/GetFile.java   | 22 ++++++++--------
 .../apache/nifi/processors/standard/ListFile.java  | 24 ++++++++----------
 .../nifi/processors/standard/TestListFile.java     | 29 ++++++++++++++++++++--
 3 files changed, 47 insertions(+), 28 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
index 318a635..3c37be8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java
@@ -48,7 +48,6 @@ import java.io.IOException;
 import java.nio.file.FileStore;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributeView;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileOwnerAttributeView;
@@ -70,7 +69,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
@@ -205,7 +203,6 @@ public class GetFile extends AbstractProcessor {
 
     private List<PropertyDescriptor> properties;
     private Set<Relationship> relationships;
-    private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
 
     private final BlockingQueue<File> fileQueue = new LinkedBlockingQueue<>();
     private final Set<File> inProcess = new HashSet<>();    // guarded by queueLock
@@ -250,18 +247,16 @@ public class GetFile extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        fileFilterRef.set(createFileFilter(context));
         fileQueue.clear();
     }
 
-    private FileFilter createFileFilter(final ProcessContext context) {
+    private FileFilter createFileFilter(final ProcessContext context, final Path inputDirectory) {
         final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
         final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
         final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
         final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-        final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
         final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
         final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
         final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
@@ -287,11 +282,13 @@ public class GetFile extends AbstractProcessor {
                     return false;
                 }
                 if (pathPattern != null) {
-                    Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
-                    if (reldir != null && !reldir.toString().isEmpty()) {
-                        if (!pathPattern.matcher(reldir.toString()).matches()) {
-                            return false;
-                        }
+                    final Path relativePath = inputDirectory.relativize(file.toPath()).getParent();
+                    if (relativePath == null || relativePath.toString().isEmpty()) {
+                        return false;
+                    }
+
+                    if (!pathPattern.matcher(relativePath.toString()).matches()) {
+                        return false;
                     }
                 }
                 //Verify that we have at least read permissions on the file we're considering grabbing
@@ -378,12 +375,13 @@ public class GetFile extends AbstractProcessor {
         final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
         final ComponentLog logger = getLogger();
+        final FileFilter fileFilter = createFileFilter(context, directory.toPath());
 
         if (fileQueue.size() < 100) {
             final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
             if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) {
                 try {
-                    final Set<File> listing = performListing(directory, fileFilterRef.get(), context.getProperty(RECURSE).asBoolean().booleanValue());
+                    final Set<File> listing = performListing(directory, fileFilter, context.getProperty(RECURSE).asBoolean());
 
                     queueLock.lock();
                     try {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index c0af210..9cae320 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -56,7 +56,6 @@ import java.nio.file.FileVisitResult;
 import java.nio.file.FileVisitor;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributeView;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileOwnerAttributeView;
@@ -81,7 +80,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -275,7 +273,6 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
     private volatile boolean includeFileAttributes;
     private volatile PerformanceTracker performanceTracker;
     private volatile long performanceLoggingTimestamp = System.currentTimeMillis();
-    private final AtomicReference<BiPredicate<Path, BasicFileAttributes>> fileFilterRef = new AtomicReference<>();
 
     public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
     public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
@@ -350,7 +347,6 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
         } else {
             performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
         }
-        fileFilterRef.set(createFileFilter(context, performanceTracker, true));
 
         final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
         final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats);
@@ -520,13 +516,14 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
         final BiPredicate<Path, BasicFileAttributes> fileFilter;
         final PerformanceTracker performanceTracker;
         if (listingMode == ListingMode.EXECUTION) {
-            fileFilter = fileFilterRef.get();
             performanceTracker = this.performanceTracker;
+            fileFilter = createFileFilter(context, performanceTracker, applyFilters, basePath);
         } else {
             final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
             performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
-            fileFilter = createFileFilter(context, performanceTracker, applyFilters);
+            fileFilter = createFileFilter(context, performanceTracker, applyFilters, basePath);
         }
+
         int maxDepth = recurse ? Integer.MAX_VALUE : 1;
 
         final BiPredicate<Path, BasicFileAttributes> matcher = new BiPredicate<Path, BasicFileAttributes>() {
@@ -664,7 +661,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
     }
 
     private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context, final PerformanceTracker performanceTracker,
-                                                                    final boolean applyFilters) {
+                                                                    final boolean applyFilters, final Path basePath) {
         final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
         final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
         final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@@ -672,13 +669,10 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
         final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
         final String fileFilter = context.getProperty(FILE_FILTER).getValue();
         final Pattern filePattern = Pattern.compile(fileFilter);
-        final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
         final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
         final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
         final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
 
-        final Path basePath = Paths.get(indir);
-
         return (path, attributes) -> {
             if (!applyFilters) {
                 return true;
@@ -706,10 +700,12 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
             final File file = path.toFile();
 
             if (pathPattern != null) {
-                if (relativePath != null && !relativePath.toString().isEmpty()) {
-                    if (!pathPattern.matcher(relativePath.toString()).matches()) {
-                        return false;
-                    }
+                if (relativePath == null || relativePath.toString().isEmpty()) {
+                    return false;
+                }
+
+                if (!pathPattern.matcher(relativePath.toString()).matches()) {
+                    return false;
                 }
             }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 7c3caa2..88a7189 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -285,6 +285,31 @@ public class TestListFile {
     }
 
     @Test
+    public void testPathFilterOnlyPicksUpMatchingFiles() throws IOException {
+        final File aaa = new File(TESTDIR, "aaa");
+        assertTrue(aaa.mkdirs() || aaa.exists());
+
+        final File bbb = new File(TESTDIR, "bbb");
+        assertTrue(bbb.mkdirs() || bbb.exists());
+
+        final File file1 = new File(aaa, "1.txt");
+        final File file2 = new File(bbb, "2.txt");
+        final File file3 = new File(TESTDIR, "3.txt");
+
+        final long tenSecondsAgo = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10L);
+        for (final File file : Arrays.asList(file1, file2, file3)) {
+            assertTrue(file.createNewFile() || file.exists());
+            assertTrue(file.setLastModified(tenSecondsAgo));
+        }
+
+        runner.setProperty(ListFile.DIRECTORY, TESTDIR);
+        runner.setProperty(ListFile.PATH_FILTER, ".+");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
+    }
+
+    @Test
     public void testFilterAge() throws Exception {
 
         final File file1 = new File(TESTDIR + "/age1.txt");
@@ -651,7 +676,7 @@ public class TestListFile {
         assertEquals(4, successFiles1.size());
 
         // filter path on pattern subdir1
-        runner.setProperty(ListFile.PATH_FILTER, "subdir1");
+        runner.setProperty(ListFile.PATH_FILTER, "subdir1.*");
         runner.setProperty(ListFile.RECURSE, "true");
         assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 3 match the filter.");
         runNext();
@@ -660,7 +685,7 @@ public class TestListFile {
         assertEquals(3, successFiles2.size());
 
         // filter path on pattern subdir2
-        runner.setProperty(ListFile.PATH_FILTER, "subdir2");
+        runner.setProperty(ListFile.PATH_FILTER, ".*/subdir2");
         runner.setProperty(ListFile.RECURSE, "true");
         assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 1 matches the filter.");
         runNext();