You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/12 11:50:00 UTC
[pulsar] branch master updated: [cleanup][function] refine file io connector (#15250)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cbefe3ed9b9 [cleanup][function] refine file io connector (#15250)
cbefe3ed9b9 is described below
commit cbefe3ed9b907e0cf1bed2a16f26055dc23026b0
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 04:49:53 2022 -0700
[cleanup][function] refine file io connector (#15250)
---
.../apache/pulsar/io/file/FileListingThread.java | 6 +--
.../apache/pulsar/io/file/FileSourceConfig.java | 4 +-
.../pulsar/io/file/FileSourceConfigTests.java | 44 ++++++++++++++++++----
3 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
index 4d35682a0cd..dac8f45754c 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
@@ -107,10 +107,10 @@ public class FileListingThread extends Thread {
private Set<File> performListing(final File directory, final FileFilter filter,
final boolean recurseSubdirectories) {
Path p = directory.toPath();
- if (!Files.isWritable(p) || !Files.isReadable(p)) {
- throw new IllegalStateException("Directory '" + directory
- + "' does not have sufficient permissions (i.e., not writable and readable)");
+ if (!Files.isReadable(p)) {
+ throw new IllegalStateException("Cannot read directory: '" + directory);
}
+
final Set<File> queue = new HashSet<>();
if (!directory.exists()) {
return queue;
diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
index 92d791dac8a..5290c87a783 100644
--- a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
@@ -133,7 +133,7 @@ public class FileSourceConfig implements Serializable {
throw new IllegalArgumentException("Specified input directory does not exist");
} else if (!Files.isReadable(Paths.get(inputDirectory))) {
throw new IllegalArgumentException("Specified input directory is not readable");
- } else if (Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
+ } else if (!Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) {
throw new IllegalArgumentException("You have requested the consumed files to be deleted, but the "
+ "source directory is not writeable.");
}
@@ -175,4 +175,4 @@ public class FileSourceConfig implements Serializable {
"The property keepFile must be false if the property processedFileSuffix is set");
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
index 64144e667ad..4a4d8d2a867 100644
--- a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
+++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.io.file;
+import static org.junit.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@@ -29,6 +31,8 @@ import org.testng.annotations.Test;
public class FileSourceConfigTests {
+ private final static String INPUT_DIRECTORY = "/dev/null";
+
@Test
public final void loadFromYamlFileTest() throws IOException {
File yamlFile = getFile("sinkConfig.yaml");
@@ -39,7 +43,7 @@ public class FileSourceConfigTests {
@Test
public final void loadFromMapTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", "/tmp");
+ map.put("inputDirectory", INPUT_DIRECTORY);
map.put("keepFile", false);
FileSourceConfig config = FileSourceConfig.load(map);
@@ -49,7 +53,7 @@ public class FileSourceConfigTests {
@Test
public final void validValidateTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", "/tmp");
+ map.put("inputDirectory", INPUT_DIRECTORY);
FileSourceConfig config = FileSourceConfig.load(map);
assertNotNull(config);
@@ -70,7 +74,7 @@ public class FileSourceConfigTests {
@Test(expectedExceptions = com.fasterxml.jackson.databind.exc.InvalidFormatException.class)
public final void InvalidBooleanPropertyTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", "/");
+ map.put("inputDirectory", INPUT_DIRECTORY);
map.put("recurse", "not a boolean");
FileSourceConfig config = FileSourceConfig.load(map);
@@ -82,7 +86,7 @@ public class FileSourceConfigTests {
expectedExceptionsMessageRegExp = "The property pollingInterval must be greater than zero")
public final void ZeroValueTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", "/");
+ map.put("inputDirectory", INPUT_DIRECTORY);
map.put("pollingInterval", 0);
FileSourceConfig config = FileSourceConfig.load(map);
@@ -94,7 +98,7 @@ public class FileSourceConfigTests {
expectedExceptionsMessageRegExp = "The property minimumFileAge must be non-negative")
public final void NegativeValueTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", "/");
+ map.put("inputDirectory", INPUT_DIRECTORY);
map.put("minimumFileAge", "-50");
FileSourceConfig config = FileSourceConfig.load(map);
@@ -106,14 +110,40 @@ public class FileSourceConfigTests {
expectedExceptionsMessageRegExp = "Invalid Regex pattern provided for fileFilter")
public final void invalidFileFilterTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
- map.put("inputDirectory", "/");
+ map.put("inputDirectory", INPUT_DIRECTORY);
map.put("fileFilter", "\\"); // Results in a single '\' being sent.
FileSourceConfig config = FileSourceConfig.load(map);
assertNotNull(config);
config.validate();
}
-
+
+ @Test
+ public final void keepFileTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/"); // root directory that we cannot write to
+ map.put("keepFile", "true"); // even though no write permission on "/", we should still be able to read
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ assertTrue(config.getKeepFile());
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "You have requested the consumed files to be deleted, " +
+ "but the source directory is not writeable.")
+ public final void invalidKeepFileTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/"); // root directory that we cannot write to
+ map.put("keepFile", "false");
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ assertFalse(config.getKeepFile());
+ config.validate();
+ }
+
private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());