You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 11:50:00 UTC

[pulsar] branch master updated: [cleanup][function] refine file io connector (#15250)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cbefe3ed9b9 [cleanup][function] refine file io connector (#15250)
cbefe3ed9b9 is described below

commit cbefe3ed9b907e0cf1bed2a16f26055dc23026b0
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 04:49:53 2022 -0700

    [cleanup][function] refine file io connector (#15250)
---
 .../apache/pulsar/io/file/FileListingThread.java   |  6 +--
 .../apache/pulsar/io/file/FileSourceConfig.java    |  4 +-
 .../pulsar/io/file/FileSourceConfigTests.java      | 44 ++++++++++++++++++----
 3 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
index 4d35682a0cd..dac8f45754c 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
@@ -107,10 +107,10 @@ public class FileListingThread extends Thread {
     private Set<File> performListing(final File directory, final FileFilter filter,
             final boolean recurseSubdirectories) {
         Path p = directory.toPath();
-        if (!Files.isWritable(p) || !Files.isReadable(p)) {
-            throw new IllegalStateException("Directory '" + directory
-                    + "' does not have sufficient permissions (i.e., not writable and readable)");
+        if (!Files.isReadable(p)) {
+            throw new IllegalStateException("Cannot read directory: '" + directory);
         }
+
         final Set<File> queue = new HashSet<>();
         if (!directory.exists()) {
             return queue;
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
index 92d791dac8a..5290c87a783 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
@@ -133,7 +133,7 @@ public class FileSourceConfig implements Serializable {
             throw new IllegalArgumentException("Specified input directory does not exist");
         } else if (!Files.isReadable(Paths.get(inputDirectory))) {
             throw new IllegalArgumentException("Specified input directory is not readable");
-        } else if (Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
+        } else if (!Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
             throw new IllegalArgumentException("You have requested the consumed files to be deleted, but the "
                     + "source directory is not writeable.");
         }
@@ -175,4 +175,4 @@ public class FileSourceConfig implements Serializable {
                     "The property keepFile must be false if the property processedFileSuffix is set");
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
index 64144e667ad..4a4d8d2a867 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.io.file;
 
+import static org.junit.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,6 +31,8 @@ import org.testng.annotations.Test;
 
 public class FileSourceConfigTests {
 
+    private final static String INPUT_DIRECTORY = "/dev/null";
+
     @Test
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("sinkConfig.yaml");
@@ -39,7 +43,7 @@ public class FileSourceConfigTests {
     @Test
     public final void loadFromMapTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/tmp");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("keepFile", false);
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -49,7 +53,7 @@ public class FileSourceConfigTests {
     @Test
     public final void validValidateTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/tmp");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         
         FileSourceConfig config = FileSourceConfig.load(map);
         assertNotNull(config);
@@ -70,7 +74,7 @@ public class FileSourceConfigTests {
     @Test(expectedExceptions = com.fasterxml.jackson.databind.exc.InvalidFormatException.class)
     public final void InvalidBooleanPropertyTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("recurse", "not a boolean");
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -82,7 +86,7 @@ public class FileSourceConfigTests {
             expectedExceptionsMessageRegExp = "The property pollingInterval must be greater than zero")
     public final void ZeroValueTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("pollingInterval", 0);
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -94,7 +98,7 @@ public class FileSourceConfigTests {
             expectedExceptionsMessageRegExp = "The property minimumFileAge must be non-negative")
     public final void NegativeValueTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("minimumFileAge", "-50");
         
         FileSourceConfig config = FileSourceConfig.load(map);
@@ -106,14 +110,40 @@ public class FileSourceConfigTests {
             expectedExceptionsMessageRegExp = "Invalid Regex pattern provided for fileFilter")
     public final void invalidFileFilterTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
-        map.put("inputDirectory", "/");
+        map.put("inputDirectory", INPUT_DIRECTORY);
         map.put("fileFilter", "\\");  // Results in a single '\' being sent.
         
         FileSourceConfig config = FileSourceConfig.load(map);
         assertNotNull(config);
         config.validate();
     }
-    
+
+    @Test
+    public final void keepFileTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/"); // root directory that we cannot write to
+        map.put("keepFile", "true"); // even though no write permission on "/", we should still be able to read
+
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        assertTrue(config.getKeepFile());
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "You have requested the consumed files to be deleted, " +
+                    "but the source directory is not writeable.")
+    public final void invalidKeepFileTest() throws  IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/"); // root directory that we cannot write to
+        map.put("keepFile", "false");
+
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        assertFalse(config.getKeepFile());
+        config.validate();
+    }
+
     private File getFile(String name) {
         ClassLoader classLoader = getClass().getClassLoader();
         return new File(classLoader.getResource(name).getFile());