You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ruanhang1993 (via GitHub)" <gi...@apache.org> on 2023/03/23 09:16:00 UTC

[GitHub] [flink] ruanhang1993 opened a new pull request, #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

ruanhang1993 opened a new pull request, #22249:
URL: https://github.com/apache/flink/pull/22249

   ## What is the purpose of the change
   
   This pull request makes filesystem sources support flexible path reading.
   
   ## Brief change log
   
     - Add two new FileEnumerator and a new file filter.
     - Add new table option 'source.regex-pattern'
     - use the new FileEnumerator if 'source.regex-pattern' is set
   
   ## Verifying this change
   
   This change added tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not documented
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RamanVerma commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "RamanVerma (via GitHub)" <gi...@apache.org>.
RamanVerma commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1221963740


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumeratorTest.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.assertSplitsEqual;
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.toPaths;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link NonSplittingRegexEnumerator}. */
+public class NonSplittingRegexEnumeratorTest {
+    /**
+     * Testing file system reference, to be cleaned up in an @After method. That way it also gets
+     * cleaned up on a test failure, without needing finally clauses in every test.
+     */
+    protected TestingFileSystem testFs;
+
+    @AfterEach
+    void unregisterTestFs() throws Exception {
+        if (testFs != null) {
+            testFs.unregister();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testIncludeSingleFile() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherfile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nested/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).containsExactlyInAnyOrder(testPaths[1]);
+    }
+
+    @Test
+    void testIncludeFilesFromRegexDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nest.[a-z]");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(Arrays.copyOfRange(testPaths, 1, testPaths.length));
+    }
+
+    @Test
+    void testIncludeSingleFileFromMultiDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt"),
+                    new Path("testfs:///dir/anotherNested/file.out"),
+                    new Path("testfs:///dir/anotherNested/nested/file.out"),
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/.*/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(
+                        Arrays.stream(testPaths)
+                                .filter(p -> p.getPath().endsWith("file.out"))
+                                .toArray(Path[]::new));
+    }
+
+    @Test
+    void testDefaultHiddenFilesFilter() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///visiblefile"),
+                    new Path("testfs:///.hiddenfile1"),
+                    new Path("testfs:///_hiddenfile2")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testHiddenDirectories() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/visiblefile"),
+                    new Path("testfs:///dir/.hiddendir/file"),
+                    new Path("testfs:///_notvisible/afile")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testFilesWithNoBlockInfo() throws Exception {
+        final Path testPath = new Path("testfs:///dir/file1");
+        testFs =
+                TestingFileSystem.createForFileStatus(
+                        "testfs",
+                        TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+        testFs.register();
+
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*/file.");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {new Path("testfs:///dir")}, 0);
+
+        assertThat(splits).hasSize(1);
+        assertSplitsEqual(
+                new FileSourceSplit("ignoredId", testPath, 0L, 12345L, 0, 12345L),
+                splits.iterator().next());
+    }
+
+    @Test
+    void testFileWithIncorrectBlocks() throws Exception {

Review Comment:
   hmm shouldn't this and the next test be part of the `BlockSplittingRegexEnumeratorTest`



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,10 +271,35 @@ private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        s -> {
+                            String regexPath = connectBasePathAndRegex(path.getPath(), s);
+                            fileSourceBuilder.setFileEnumerator(
+                                    bulkFormat.isSplittable()
+                                            ? () -> new BlockSplittingRegexEnumerator(regexPath)
+                                            : () -> new NonSplittingRegexEnumerator(regexPath));
+                        });
 
         return SourceProvider.of(fileSourceBuilder.build());
     }
 
+    private String connectBasePathAndRegex(String basePath, String regex) {
+        StringBuilder result = new StringBuilder();
+        result.append(basePath);
+        if (!basePath.endsWith(Path.SEPARATOR)) {
+            result.append(Path.SEPARATOR);
+        }
+        int startIndex = 0;
+        while (startIndex < regex.length()

Review Comment:
   I think this loop is removing multiple path separators at the start of regex, for example `////.*`
   Can there be a regex like this `////.*///.*`



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumeratorTest.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.assertSplitsEqual;
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.toPaths;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link NonSplittingRegexEnumerator}. */
+public class NonSplittingRegexEnumeratorTest {
+    /**
+     * Testing file system reference, to be cleaned up in an @After method. That way it also gets
+     * cleaned up on a test failure, without needing finally clauses in every test.
+     */
+    protected TestingFileSystem testFs;
+
+    @AfterEach
+    void unregisterTestFs() throws Exception {
+        if (testFs != null) {
+            testFs.unregister();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testIncludeSingleFile() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherfile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");

Review Comment:
   nit: `baseDIr` -> `baseDir`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1236243710


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveAllDirEnumerator {

Review Comment:
   If reuse `addSplitsForPath`  in `NonSplittingRecursiveAllDirEnumerator`, we can't resue the code in `BlockSplittingRecursiveEnumerator`.
   So, it comes out whether to resue the code in `BlockSplittingRecursiveEnumerator` or the code in `NonSplittingRecursiveAllDirEnumerator `.  
   Why not choose to reuse the code in `BlockSplittingRecursiveEnumerator`? Otherwise, we need to rewrite the codes in  `BlockSplittingRecursiveEnumerator` which is a little of complex to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222666297


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,10 +271,35 @@ private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        s -> {
+                            String regexPath = connectBasePathAndRegex(path.getPath(), s);
+                            fileSourceBuilder.setFileEnumerator(
+                                    bulkFormat.isSplittable()
+                                            ? () -> new BlockSplittingRegexEnumerator(regexPath)
+                                            : () -> new NonSplittingRegexEnumerator(regexPath));
+                        });
 
         return SourceProvider.of(fileSourceBuilder.build());
     }
 
+    private String connectBasePathAndRegex(String basePath, String regex) {
+        StringBuilder result = new StringBuilder();
+        result.append(basePath);
+        if (!basePath.endsWith(Path.SEPARATOR)) {
+            result.append(Path.SEPARATOR);
+        }
+        int startIndex = 0;
+        while (startIndex < regex.length()

Review Comment:
   I think we could remove the duplicate `/`s in beginning. But it is hard to consider the duplicate `/`s in middle.
   we could just treat the regex like a string. And handle files whose path matches the regex.
   Actually I am not sure about how to handle this part. @luoyuxia What do you think about it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1587022797

   Hi, @luoyuxia & @RamanVerma .
   I have refactored this PR and removed the `connectBasePathAndRegex`. The previous version is hard to understand and use . This new code will use the actual regex string to match all files in the base dir.
   
   Please take a look at it . Thanks ~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1223913567


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    private final RegexFileFilter regexFileFilter;

Review Comment:
   I think it is not suitable when the regex is complex. I think I should remove the `connectBasePathAndRegex ` and use the actual value passed by `source.regex-pattern`.
   
   But I still think we should keep the table path as the base dir to start to search files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222615155


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;
+    private final RegexFileFilter regexFileFilter;
+
+    public NonSplittingRegexEnumerator(String pathPattern) {
+        this(pathPattern, new DefaultFileFilter());
+    }
+
+    public NonSplittingRegexEnumerator(String pathPattern, Predicate<Path> fileFilter) {
+        this.regexFileFilter = new RegexFileFilter(pathPattern);
+        this.fileFilter = fileFilter;
+    }
+
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        final ArrayList<FileSourceSplit> splits = new ArrayList<>();
+        for (Path path : paths) {
+            final FileSystem fs = path.getFileSystem();
+            final FileStatus status = fs.getFileStatus(path);
+            addSplitsForPath(status, fs, splits, regexFileFilter);

Review Comment:
   The method `addSplitsForPath ` will be invoked in itself at line 83. The last parameter may be null.
   When it is null, we will know the dir/file comes from a matched path. And we will handle them if they are not filtered by the custom `fileFilter`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222616580


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;
+    private final RegexFileFilter regexFileFilter;
+
+    public NonSplittingRegexEnumerator(String pathPattern) {
+        this(pathPattern, new DefaultFileFilter());
+    }
+
+    public NonSplittingRegexEnumerator(String pathPattern, Predicate<Path> fileFilter) {
+        this.regexFileFilter = new RegexFileFilter(pathPattern);
+        this.fileFilter = fileFilter;
+    }
+
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        final ArrayList<FileSourceSplit> splits = new ArrayList<>();
+        for (Path path : paths) {
+            final FileSystem fs = path.getFileSystem();
+            final FileStatus status = fs.getFileStatus(path);
+            addSplitsForPath(status, fs, splits, regexFileFilter);

Review Comment:
   It aims to matching a directory and handling all files under the matched directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1612381479

   Thanks all. Merge it now. Feel free to left comment if you have other concern.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JuliaBogdan commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "JuliaBogdan (via GitHub)" <gi...@apache.org>.
JuliaBogdan commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1574414838

   Hi @ruanhang1993, these changes are very much needed, thank you!
   Are there any blockers for the PR to be reviewed and merged?
   
   Also, is there a workaround to this for the meantime, i.e. a way to specify a custom enumerator to `FileSource` or a custom file filter to the existing enumerators?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1236234583


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveAllDirEnumerator {

Review Comment:
   `BlockSplittingRecursiveAllDirEnumerator` need to reuse the method `addSplitsForPath` in `NonSplittingRecursiveAllDirEnumerator`, not `BlockSplittingRecursiveEnumerator`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1600273842

   > Also, don't forget to update doc for the newly added options. I'm fine with adding it in this pr or another pr.
   
   Thanks for reviewing. I will add docs in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia merged pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia merged PR #22249:
URL: https://github.com/apache/flink/pull/22249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1576405479

   I'll see if I can find a reviewer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1230407539


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
                                     + "The statistics reporting is a heavy operation in some cases,"
                                     + "this config allows users to choose the statistics type according to different situations.");
 
+    public static final ConfigOption<String> SOURCE_REGEX_PATTERN =
+            key("source.regex-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The regex pattern to filter files or directories. "
+                                    + "This regex pattern should be a relative path for the `path` option."

Review Comment:
   Please remember to update the desc since IIUC it'll be absolute path.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {

Review Comment:
   nit:
   mark it as internal.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveAllDirEnumerator {

Review Comment:
   nit: also mark it as internal



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;

Review Comment:
   make the `fileFilter` to be protected in `NonSplittingRecursiveEnumerator` so that we won't need it again in here.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    /** The filter used to skip recursion in the hidden directories. */

Review Comment:
   ```suggestion
       /** The filter used to skip hidden directories. */
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    /** The filter used to skip recursion in the hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a NonSplittingRegexEnumerator that enumerates all files whose path or parent path

Review Comment:
   Why parent path?
   ```suggestion
        * Creates a NonSplittingRegexEnumerator that enumerates all files whose path
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableFactoryTest.java:
##########
@@ -49,6 +49,21 @@ public class FileSystemTableFactoryTest {
                     Column.physical("f1", DataTypes.BIGINT()),
                     Column.physical("f2", DataTypes.BIGINT()));
 
+    @Test
+    public void testSourceRegexPattern() {

Review Comment:
   Do we really need it since we have IT for it?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceStreamingITCase.java:
##########
@@ -88,4 +88,61 @@ public void testMonitorContinuously() throws Exception {
 
         assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
     }
+
+    @Test
+    public void testRegexPath() throws Exception {

Review Comment:
   nit
   ```suggestion
       public void testSourceWithRegexPattern() throws Exception {
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceStreamingITCase.java:
##########
@@ -88,4 +88,61 @@ public void testMonitorContinuously() throws Exception {
 
         assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
     }
+
+    @Test
+    public void testRegexPath() throws Exception {

Review Comment:
   I think the test can't verify the `source.regex-pattern` take effect or not since it won't filer out any files. 



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    /** The filter used to skip recursion in the hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a NonSplittingRegexEnumerator that enumerates all files whose path or parent path
+     * matches the regex except hidden files. Hidden files are considered files where the filename
+     * starts with '.' or with '_'.
+     */
+    public NonSplittingRecursiveAllDirEnumerator(String pathPattern) {

Review Comment:
   nit:
   ```suggestion
       public NonSplittingRecursiveAllDirEnumerator(String pathRegexPattern) {
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveAllDirEnumerator {

Review Comment:
   Why not make it extend `BlockSplittingRecursiveEnumerator`? If we extend `BlockSplittingRecursiveEnumerator`, the class will be simipler with less code and complex logic.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,6 +271,19 @@ private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        regex -> {

Review Comment:
   nit:
   `{` can be removed.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
                                     + "The statistics reporting is a heavy operation in some cases,"
                                     + "this config allows users to choose the statistics type according to different situations.");
 
+    public static final ConfigOption<String> SOURCE_REGEX_PATTERN =
+            key("source.regex-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The regex pattern to filter files or directories. "
+                                    + "This regex pattern should be a relative path for the `path` option."

Review Comment:
   use `source.path.regex-pattern`?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/RegexFileFilter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.core.fs.Path;
+
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * A file filter that filters out files based on the given path pattern and hidden files, see {@link

Review Comment:
   nit:
   ```suggestion
    * A file filter that filters out hidden files, see {@link DefaultFileFilter} and the files whose path doesn't match the given regex pattern.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1480851509

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b07c442f17aee21c8d3b0a27d5fa72c4659f9ecf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b07c442f17aee21c8d3b0a27d5fa72c4659f9ecf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b07c442f17aee21c8d3b0a27d5fa72c4659f9ecf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rculbertson commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "rculbertson (via GitHub)" <gi...@apache.org>.
rculbertson commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1574630405

   I'd also like to see this PR merged, so gave this a +1 as vote of support :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1582076105

   Hi, @RamanVerma . Thanks for your review. I will rebase the master and fix comments later. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JuliaBogdan commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "JuliaBogdan (via GitHub)" <gi...@apache.org>.
JuliaBogdan commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222653397


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;

Review Comment:
   A bit out of scope for the PR, but wanted to get people's thoughts on this:
   > allows caller to pass in custom filters
   
   If I'm not mistaken, there is no way to specify a custom file filter for `FileSystemTableSource`  in the current implementation (neither via custom file enumerator or custom predicate to the existing ones).
   Is it by design? If so, what would be an option for passing it in, except implementing a custom version of `FileSystemTableSource`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RamanVerma commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "RamanVerma (via GitHub)" <gi...@apache.org>.
RamanVerma commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1221827919


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;

Review Comment:
   It will be helpful to explain why we have two different Predicates. I suppose `fileFilter` allows caller to pass in custom filters or, it is used to check for hidden files and directories (default). Adding a comment about these two Predicates will help.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;
+    private final RegexFileFilter regexFileFilter;
+
+    public NonSplittingRegexEnumerator(String pathPattern) {
+        this(pathPattern, new DefaultFileFilter());
+    }
+
+    public NonSplittingRegexEnumerator(String pathPattern, Predicate<Path> fileFilter) {
+        this.regexFileFilter = new RegexFileFilter(pathPattern);
+        this.fileFilter = fileFilter;
+    }
+
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits)
+            throws IOException {
+        final ArrayList<FileSourceSplit> splits = new ArrayList<>();
+        for (Path path : paths) {
+            final FileSystem fs = path.getFileSystem();
+            final FileStatus status = fs.getFileStatus(path);
+            addSplitsForPath(status, fs, splits, regexFileFilter);

Review Comment:
   You can access regexFileFilter directly in the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1223891576


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,10 +271,35 @@ private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        s -> {
+                            String regexPath = connectBasePathAndRegex(path.getPath(), s);
+                            fileSourceBuilder.setFileEnumerator(
+                                    bulkFormat.isSplittable()
+                                            ? () -> new BlockSplittingRegexEnumerator(regexPath)
+                                            : () -> new NonSplittingRegexEnumerator(regexPath));
+                        });
 
         return SourceProvider.of(fileSourceBuilder.build());
     }
 
+    private String connectBasePathAndRegex(String basePath, String regex) {
+        StringBuilder result = new StringBuilder();
+        result.append(basePath);
+        if (!basePath.endsWith(Path.SEPARATOR)) {
+            result.append(Path.SEPARATOR);
+        }
+        int startIndex = 0;
+        while (startIndex < regex.length()

Review Comment:
   No, I mean user provide base dir and `source.regex-pattern`. But the `source.regex-pattern` should also be absolute path insteading of relative path.  But we only serach from the base dir  and to check each file.
   
   Take this for example, `source.regex-pattern` will be `/dir/t/.*/inner/.*` which looks more clear than `.*/inner/.*` to me.
   I'm wondering whether use `/dir/t/.*/inner/.*` will be more clear.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1236760683


##########
docs/content/docs/connectors/table/filesystem.md:
##########
@@ -50,6 +50,10 @@ CREATE TABLE MyUserTable (
                                         -- section for more details
   'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
                                         -- column value is null/empty string
+  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter files under the directory

Review Comment:
   ```suggestion
     'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter files to read under the directory
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+@Internal
+public class BlockSplittingRecursiveAllDirEnumerator extends BlockSplittingRecursiveEnumerator {
+
+    /** The filter used to skip hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a new enumerator that enumerates all files whose file path matches the regex except
+     * hidden files. Hidden files are considered files where the filename starts with '.' or with
+     * '_'.
+     *
+     * <p>The enumerator does not split files that have a suffix corresponding to a known
+     * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...). See {@link
+     * StandardDeCompressors} for details.
+     */
+    public BlockSplittingRecursiveAllDirEnumerator(String pathPattern) {
+        this(
+                new RegexFileFilter(pathPattern),
+                StandardDeCompressors.getCommonSuffixes().toArray(new String[0]));
+    }
+
+    /**
+     * Creates a new enumerator that uses the given predicate as a filter for file paths, and avoids
+     * splitting files with the given extension (typically to avoid splitting compressed files).
+     */
+    public BlockSplittingRecursiveAllDirEnumerator(
+            final Predicate<Path> fileFilter, final String[] nonSplittableFileSuffixes) {
+        super(fileFilter, nonSplittableFileSuffixes);
+    }
+
+    @Override
+    protected void addSplitsForPath(
+            FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)
+            throws IOException {
+        if (fileStatus.isDir()) {
+            if (!hiddenDirFilter.test(fileStatus.getPath())) {
+                return;
+            }
+            final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
+            for (FileStatus containedStatus : containedFiles) {
+                // If this dir matches the regex, add all files under it as a split (not include the

Review Comment:
   plase remove these comments since it's out of date



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+@Internal
+public class BlockSplittingRecursiveAllDirEnumerator extends BlockSplittingRecursiveEnumerator {
+
+    /** The filter used to skip hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a new enumerator that enumerates all files whose file path matches the regex except
+     * hidden files. Hidden files are considered files where the filename starts with '.' or with
+     * '_'.
+     *
+     * <p>The enumerator does not split files that have a suffix corresponding to a known
+     * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...). See {@link
+     * StandardDeCompressors} for details.
+     */
+    public BlockSplittingRecursiveAllDirEnumerator(String pathPattern) {
+        this(
+                new RegexFileFilter(pathPattern),
+                StandardDeCompressors.getCommonSuffixes().toArray(new String[0]));
+    }
+
+    /**
+     * Creates a new enumerator that uses the given predicate as a filter for file paths, and avoids
+     * splitting files with the given extension (typically to avoid splitting compressed files).
+     */
+    public BlockSplittingRecursiveAllDirEnumerator(
+            final Predicate<Path> fileFilter, final String[] nonSplittableFileSuffixes) {
+        super(fileFilter, nonSplittableFileSuffixes);
+    }
+
+    @Override
+    protected void addSplitsForPath(
+            FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)
+            throws IOException {
+        if (fileStatus.isDir()) {
+            if (!hiddenDirFilter.test(fileStatus.getPath())) {
+                return;
+            }
+            final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
+            for (FileStatus containedStatus : containedFiles) {
+                // If this dir matches the regex, add all files under it as a split (not include the

Review Comment:
   At least I haven't seen the logic that `If this dir matches the regex,.....`



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+@Internal
+public class NonSplittingRecursiveAllDirEnumerator extends NonSplittingRecursiveEnumerator {
+    /** The filter used to skip hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a NonSplittingRegexEnumerator that enumerates all files whose file path matches the
+     * regex except hidden files. Hidden files are considered files where the filename starts with
+     * '.' or with '_'.
+     */
+    public NonSplittingRecursiveAllDirEnumerator(String pathRegexPattern) {
+        this(new RegexFileFilter(pathRegexPattern));
+    }
+
+    /**
+     * Creates a NonSplittingRegexEnumerator that enumerates all files whose file path matches the
+     * regex. Support to use given custom predicate as a filter for file paths.
+     */
+    public NonSplittingRecursiveAllDirEnumerator(Predicate<Path> fileFilter) {
+        super(fileFilter);
+    }
+
+    @Override
+    protected void addSplitsForPath(
+            FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target)
+            throws IOException {
+        if (fileStatus.isDir()) {
+            if (!hiddenDirFilter.test(fileStatus.getPath())) {
+                return;
+            }
+            final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
+            for (FileStatus containedStatus : containedFiles) {
+                // If this dir matches the regex, add all files under it as a split (not include the

Review Comment:
   dito



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
                                     + "The statistics reporting is a heavy operation in some cases,"
                                     + "this config allows users to choose the statistics type according to different situations.");
 
+    public static final ConfigOption<String> SOURCE_REGEX_PATTERN =

Review Comment:
   ```suggestion
       public static final ConfigOption<String> SOURCE_PATH_REGEX_PATTERN =
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222654083


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumeratorTest.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.assertSplitsEqual;
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.toPaths;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link NonSplittingRegexEnumerator}. */
+public class NonSplittingRegexEnumeratorTest {
+    /**
+     * Testing file system reference, to be cleaned up in an @After method. That way it also gets
+     * cleaned up on a test failure, without needing finally clauses in every test.
+     */
+    protected TestingFileSystem testFs;
+
+    @AfterEach
+    void unregisterTestFs() throws Exception {
+        if (testFs != null) {
+            testFs.unregister();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testIncludeSingleFile() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherfile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nested/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).containsExactlyInAnyOrder(testPaths[1]);
+    }
+
+    @Test
+    void testIncludeFilesFromRegexDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nest.[a-z]");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(Arrays.copyOfRange(testPaths, 1, testPaths.length));
+    }
+
+    @Test
+    void testIncludeSingleFileFromMultiDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt"),
+                    new Path("testfs:///dir/anotherNested/file.out"),
+                    new Path("testfs:///dir/anotherNested/nested/file.out"),
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/.*/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(
+                        Arrays.stream(testPaths)
+                                .filter(p -> p.getPath().endsWith("file.out"))
+                                .toArray(Path[]::new));
+    }
+
+    @Test
+    void testDefaultHiddenFilesFilter() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///visiblefile"),
+                    new Path("testfs:///.hiddenfile1"),
+                    new Path("testfs:///_hiddenfile2")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testHiddenDirectories() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/visiblefile"),
+                    new Path("testfs:///dir/.hiddendir/file"),
+                    new Path("testfs:///_notvisible/afile")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testFilesWithNoBlockInfo() throws Exception {
+        final Path testPath = new Path("testfs:///dir/file1");
+        testFs =
+                TestingFileSystem.createForFileStatus(
+                        "testfs",
+                        TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+        testFs.register();
+
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*/file.");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {new Path("testfs:///dir")}, 0);
+
+        assertThat(splits).hasSize(1);
+        assertSplitsEqual(
+                new FileSourceSplit("ignoredId", testPath, 0L, 12345L, 0, 12345L),
+                splits.iterator().next());
+    }
+
+    @Test
+    void testFileWithIncorrectBlocks() throws Exception {

Review Comment:
   This test also lies in the `NonSplittingRecursiveEnumeratorTest`.
   I think `NonSplittingRecursiveEnumerator` and `NonSplittingRegexEnumerator` could also handle the files with blocks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222654083


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumeratorTest.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.assertSplitsEqual;
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.toPaths;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link NonSplittingRegexEnumerator}. */
+public class NonSplittingRegexEnumeratorTest {
+    /**
+     * Testing file system reference, to be cleaned up in an @After method. That way it also gets
+     * cleaned up on a test failure, without needing finally clauses in every test.
+     */
+    protected TestingFileSystem testFs;
+
+    @AfterEach
+    void unregisterTestFs() throws Exception {
+        if (testFs != null) {
+            testFs.unregister();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testIncludeSingleFile() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherfile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nested/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).containsExactlyInAnyOrder(testPaths[1]);
+    }
+
+    @Test
+    void testIncludeFilesFromRegexDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nest.[a-z]");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(Arrays.copyOfRange(testPaths, 1, testPaths.length));
+    }
+
+    @Test
+    void testIncludeSingleFileFromMultiDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt"),
+                    new Path("testfs:///dir/anotherNested/file.out"),
+                    new Path("testfs:///dir/anotherNested/nested/file.out"),
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/.*/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(
+                        Arrays.stream(testPaths)
+                                .filter(p -> p.getPath().endsWith("file.out"))
+                                .toArray(Path[]::new));
+    }
+
+    @Test
+    void testDefaultHiddenFilesFilter() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///visiblefile"),
+                    new Path("testfs:///.hiddenfile1"),
+                    new Path("testfs:///_hiddenfile2")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testHiddenDirectories() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/visiblefile"),
+                    new Path("testfs:///dir/.hiddendir/file"),
+                    new Path("testfs:///_notvisible/afile")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testFilesWithNoBlockInfo() throws Exception {
+        final Path testPath = new Path("testfs:///dir/file1");
+        testFs =
+                TestingFileSystem.createForFileStatus(
+                        "testfs",
+                        TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+        testFs.register();
+
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*/file.");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {new Path("testfs:///dir")}, 0);
+
+        assertThat(splits).hasSize(1);
+        assertSplitsEqual(
+                new FileSourceSplit("ignoredId", testPath, 0L, 12345L, 0, 12345L),
+                splits.iterator().next());
+    }
+
+    @Test
+    void testFileWithIncorrectBlocks() throws Exception {

Review Comment:
   This test also lies in the `NonSplittingRecursiveEnumeratorTest`.
   I think `NonSplittingRecursiveEnumerator` and `NonSplittingRegexEnumerator` could also deal with the files with blocks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1223886235


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,10 +271,35 @@ private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        s -> {
+                            String regexPath = connectBasePathAndRegex(path.getPath(), s);
+                            fileSourceBuilder.setFileEnumerator(
+                                    bulkFormat.isSplittable()
+                                            ? () -> new BlockSplittingRegexEnumerator(regexPath)
+                                            : () -> new NonSplittingRegexEnumerator(regexPath));
+                        });
 
         return SourceProvider.of(fileSourceBuilder.build());
     }
 
+    private String connectBasePathAndRegex(String basePath, String regex) {
+        StringBuilder result = new StringBuilder();
+        result.append(basePath);
+        if (!basePath.endsWith(Path.SEPARATOR)) {
+            result.append(Path.SEPARATOR);
+        }
+        int startIndex = 0;
+        while (startIndex < regex.length()

Review Comment:
   @luoyuxia Using a base path will help to reduce the search time. 
   If we only provide `source.regex-pattern` , we have to search files from root path `/` when `source.regex-pattern` = `'/dir/t/.*/inner/.*'`.
   If we provide base dir and `source.regex-pattern`, we will start to search files under the base dir `/dir/t/` when base dir = `'/dir/t'` and `source.regex-pattern` = `'.*/inner/.*'`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222677339


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;

Review Comment:
   I think this custom filter may not be exposed to the table API. We can only use it by the DataStream API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222617285


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>This enumerator filters out files with the common hidden file prefixes '.' and '_'.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    private final Predicate<Path> fileFilter;

Review Comment:
   I will add some comments about them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1223912208


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    private final RegexFileFilter regexFileFilter;

Review Comment:
   Because it does not satisfy some situations. 
   For example, we have files as follows.
   ```
   /
    |- dir
        |- test
              |- test.java
              |- innerDir
                    |- test.java
   ```
   We want to read these two `test.java` under the directory `/dir`. Then we will use the final regex `/dir/.*/test.java`.
   If we reuse `NonSplittingRecursiveEnumerator ` + `fileFilter`, it will skip the directory `/dir/test` because its path does not match the regex `/dir/.*/test.java`. And it will lose the files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1223819746


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    private final RegexFileFilter regexFileFilter;

Review Comment:
   Do we really need this in here? I think in here `fileFilter` is enough. We can always construnct the `fileFilter` using regex expression.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,10 +271,35 @@ private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        s -> {
+                            String regexPath = connectBasePathAndRegex(path.getPath(), s);
+                            fileSourceBuilder.setFileEnumerator(
+                                    bulkFormat.isSplittable()
+                                            ? () -> new BlockSplittingRegexEnumerator(regexPath)
+                                            : () -> new NonSplittingRegexEnumerator(regexPath));
+                        });
 
         return SourceProvider.of(fileSourceBuilder.build());
     }
 
+    private String connectBasePathAndRegex(String basePath, String regex) {
+        StringBuilder result = new StringBuilder();
+        result.append(basePath);
+        if (!basePath.endsWith(Path.SEPARATOR)) {
+            result.append(Path.SEPARATOR);
+        }
+        int startIndex = 0;
+        while (startIndex < regex.length()

Review Comment:
   Still, I'm try to understand why need `connectBasePathAndRegex`. It turns out that what the option `source.regex-pattern` is for?
    From the source code, seems it'll look like if the table path is `/dir`,  the `source.regex-pattern` is `t/*`, it will try to match `/dir/t/*`?
   
   TBH, I'm not sure it's reasonable or not for it look wired or unclear to me.
   Another choice is user need to specifc `/dir/t/*` in `source.regex-pattern`.
   
   I don't have a strong perference about these two choice, and   don't know which is a better choice. Let's hear other's voice.
   
   



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    private final RegexFileFilter regexFileFilter;

Review Comment:
   Also, I'm now thing do we really need to introduce `NonSplittingRecursiveEnumerator`?
   Why don't reuse `NonSplittingRecursiveEnumerator`, but the `fileFilter` will compose the logic of `DefaultFileFilter` and `RegexFileFilter`. 



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumerator.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively. Each file
+ * matched the given regex pattern becomes one split; this enumerator does not split files into
+ * smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ */
+public class NonSplittingRegexEnumerator extends NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    private final RegexFileFilter regexFileFilter;

Review Comment:
   I mean construct the `fileFilter`  in `FileSystemTableSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1582076511

   Thanks all. I'll reivew & merge. 
   I hope others can also have a look to make sure this pr can really meet your need; :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "ruanhang1993 (via GitHub)" <gi...@apache.org>.
ruanhang1993 commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1222654083


##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumeratorTest.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.assertSplitsEqual;
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.toPaths;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link NonSplittingRegexEnumerator}. */
+public class NonSplittingRegexEnumeratorTest {
+    /**
+     * Testing file system reference, to be cleaned up in an @After method. That way it also gets
+     * cleaned up on a test failure, without needing finally clauses in every test.
+     */
+    protected TestingFileSystem testFs;
+
+    @AfterEach
+    void unregisterTestFs() throws Exception {
+        if (testFs != null) {
+            testFs.unregister();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testIncludeSingleFile() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherfile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nested/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).containsExactlyInAnyOrder(testPaths[1]);
+    }
+
+    @Test
+    void testIncludeFilesFromRegexDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nest.[a-z]");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(Arrays.copyOfRange(testPaths, 1, testPaths.length));
+    }
+
+    @Test
+    void testIncludeSingleFileFromMultiDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt"),
+                    new Path("testfs:///dir/anotherNested/file.out"),
+                    new Path("testfs:///dir/anotherNested/nested/file.out"),
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/.*/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(
+                        Arrays.stream(testPaths)
+                                .filter(p -> p.getPath().endsWith("file.out"))
+                                .toArray(Path[]::new));
+    }
+
+    @Test
+    void testDefaultHiddenFilesFilter() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///visiblefile"),
+                    new Path("testfs:///.hiddenfile1"),
+                    new Path("testfs:///_hiddenfile2")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testHiddenDirectories() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/visiblefile"),
+                    new Path("testfs:///dir/.hiddendir/file"),
+                    new Path("testfs:///_notvisible/afile")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testFilesWithNoBlockInfo() throws Exception {
+        final Path testPath = new Path("testfs:///dir/file1");
+        testFs =
+                TestingFileSystem.createForFileStatus(
+                        "testfs",
+                        TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+        testFs.register();
+
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*/file.");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {new Path("testfs:///dir")}, 0);
+
+        assertThat(splits).hasSize(1);
+        assertSplitsEqual(
+                new FileSourceSplit("ignoredId", testPath, 0L, 12345L, 0, 12345L),
+                splits.iterator().next());
+    }
+
+    @Test
+    void testFileWithIncorrectBlocks() throws Exception {

Review Comment:
   This test also lies in the `NonSplittingRecursiveEnumeratorTest`. It think it is alright.



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRegexEnumeratorTest.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.assertSplitsEqual;
+import static org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumeratorTest.toPaths;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link NonSplittingRegexEnumerator}. */
+public class NonSplittingRegexEnumeratorTest {
+    /**
+     * Testing file system reference, to be cleaned up in an @After method. That way it also gets
+     * cleaned up on a test failure, without needing finally clauses in every test.
+     */
+    protected TestingFileSystem testFs;
+
+    @AfterEach
+    void unregisterTestFs() throws Exception {
+        if (testFs != null) {
+            testFs.unregister();
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    void testIncludeSingleFile() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherfile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nested/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).containsExactlyInAnyOrder(testPaths[1]);
+    }
+
+    @Test
+    void testIncludeFilesFromRegexDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/nest.[a-z]");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(Arrays.copyOfRange(testPaths, 1, testPaths.length));
+    }
+
+    @Test
+    void testIncludeSingleFileFromMultiDirectory() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/file1"),
+                    new Path("testfs:///dir/nested/file.out"),
+                    new Path("testfs:///dir/nested/anotherFile.txt"),
+                    new Path("testfs:///dir/nested/nested/doubleNestedFile.txt"),
+                    new Path("testfs:///dir/anotherNested/file.out"),
+                    new Path("testfs:///dir/anotherNested/nested/file.out"),
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///dir");
+        final NonSplittingRegexEnumerator enumerator =
+                createEnumerator(baseDIr.getPath() + "/.*/file.out");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits))
+                .containsExactlyInAnyOrder(
+                        Arrays.stream(testPaths)
+                                .filter(p -> p.getPath().endsWith("file.out"))
+                                .toArray(Path[]::new));
+    }
+
+    @Test
+    void testDefaultHiddenFilesFilter() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///visiblefile"),
+                    new Path("testfs:///.hiddenfile1"),
+                    new Path("testfs:///_hiddenfile2")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testHiddenDirectories() throws Exception {
+        final Path[] testPaths =
+                new Path[] {
+                    new Path("testfs:///dir/visiblefile"),
+                    new Path("testfs:///dir/.hiddendir/file"),
+                    new Path("testfs:///_notvisible/afile")
+                };
+        testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+        testFs.register();
+
+        Path baseDIr = new Path("testfs:///");
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {baseDIr}, 1);
+
+        assertThat(toPaths(splits)).isEqualTo(Collections.singletonList(testPaths[0]));
+    }
+
+    @Test
+    void testFilesWithNoBlockInfo() throws Exception {
+        final Path testPath = new Path("testfs:///dir/file1");
+        testFs =
+                TestingFileSystem.createForFileStatus(
+                        "testfs",
+                        TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+        testFs.register();
+
+        final NonSplittingRegexEnumerator enumerator = createEnumerator("/.*/file.");
+        final Collection<FileSourceSplit> splits =
+                enumerator.enumerateSplits(new Path[] {new Path("testfs:///dir")}, 0);
+
+        assertThat(splits).hasSize(1);
+        assertSplitsEqual(
+                new FileSourceSplit("ignoredId", testPath, 0L, 12345L, 0, 12345L),
+                splits.iterator().next());
+    }
+
+    @Test
+    void testFileWithIncorrectBlocks() throws Exception {

Review Comment:
   This test also lies in the `NonSplittingRecursiveEnumeratorTest`. It think it is all right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] luoyuxia commented on pull request #22249: [FLINK-17398][connector/filesystem] Filesystem sources support flexible path reading

Posted by "luoyuxia (via GitHub)" <gi...@apache.org>.
luoyuxia commented on PR #22249:
URL: https://github.com/apache/flink/pull/22249#issuecomment-1592481788

   Also, don't forget to update doc for the newly added options. I'm fine with adding it in this pr or another pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org