You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:39:23 UTC
[02/17] flink git commit: [FLINK-5432] recursively scan nested files
in ContinuousFileMonitoringFunction
[FLINK-5432] recursively scan nested files in ContinuousFileMonitoringFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c18e22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c18e22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c18e22
Branch: refs/heads/release-1.2
Commit: 28c18e22127a85f773e7504a0e9d188bad9334e2
Parents: 79b6826
Author: Yassine Marzougui <y....@mindlytix.com>
Authored: Wed Jan 11 01:43:19 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 21:41:59 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/io/FileInputFormat.java | 2 +-
.../hdfstests/ContinuousFileProcessingTest.java | 53 ++++++++++++++++++++
.../ContinuousFileMonitoringFunction.java | 18 ++++---
3 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28c18e22/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 1d092af..785fb3b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -637,7 +637,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
* @param fileStatus The file status to check.
* @return true, if the given file or directory is accepted
*/
- protected boolean acceptFile(FileStatus fileStatus) {
+ public boolean acceptFile(FileStatus fileStatus) {
final String name = fileStatus.getPath().getName();
return !name.startsWith("_")
&& !name.startsWith(".")
http://git-wip-us.apache.org/repos/asf/flink/blob/28c18e22/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 0cb1bad..c29dd27 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -588,6 +588,59 @@ public class ContinuousFileProcessingTest {
}
@Test
+ public void testNestedFilesProcessing() throws Exception {
+ final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+ final Set<String> filesToBeRead = new TreeSet<>();
+
+ // create two nested directories
+ org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir");
+ org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir" + "/" + "secondLevelDir");
+ Assert.assertFalse(hdfs.exists(firstLevelDir));
+ hdfs.mkdirs(firstLevelDir);
+ hdfs.mkdirs(secondLevelDir);
+
+ // create files in the base dir, the first level dir and the second level dir
+ for (int i = 0; i < NO_OF_FILES; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "firstLevelFile", i, "This is test line.");
+ filesCreated.add(file.f0);
+ filesToBeRead.add(file.f0.getName());
+ }
+ for (int i = 0; i < NO_OF_FILES; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(firstLevelDir.toString(), "secondLevelFile", i, "This is test line.");
+ filesCreated.add(file.f0);
+ filesToBeRead.add(file.f0.getName());
+ }
+ for (int i = 0; i < NO_OF_FILES; i++) {
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(secondLevelDir.toString(), "thirdLevelFile", i, "This is test line.");
+ filesCreated.add(file.f0);
+ filesToBeRead.add(file.f0.getName());
+ }
+
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ format.setNestedFileEnumeration(true);
+
+ ContinuousFileMonitoringFunction<String> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format,
+ FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+ final FileVerifyingSourceContext context =
+ new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
+
+ monitoringFunction.open(new Configuration());
+ monitoringFunction.run(context);
+
+ Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());
+
+ // finally delete the dirs and the files created for the test.
+ for (org.apache.hadoop.fs.Path file: filesCreated) {
+ hdfs.delete(file, false);
+ }
+ hdfs.delete(secondLevelDir, false);
+ hdfs.delete(firstLevelDir, false);
+ }
+
+ @Test
public void testSortingOnModTime() throws Exception {
final long[] modTimes = new long[NO_OF_FILES];
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
http://git-wip-us.apache.org/repos/asf/flink/blob/28c18e22/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index e0a042a..589e285 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -232,7 +232,7 @@ public class ContinuousFileMonitoringFunction<OUT>
SourceContext<TimestampedFileInputSplit> context) throws IOException {
assert (Thread.holdsLock(checkpointLock));
- Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs);
+ Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);
for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
@@ -282,11 +282,11 @@ public class ContinuousFileMonitoringFunction<OUT>
* Returns the paths of the files not yet processed.
* @param fileSystem The filesystem where the monitored directory resides.
*/
- private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
+ private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) throws IOException {
final FileStatus[] statuses;
try {
- statuses = fileSystem.listStatus(new Path(path));
+ statuses = fileSystem.listStatus(path);
} catch (IOException e) {
// we may run into an IOException if files are moved while listing their status
// delay the check for eligible files in this case
@@ -300,10 +300,14 @@ public class ContinuousFileMonitoringFunction<OUT>
Map<Path, FileStatus> files = new HashMap<>();
// handle the new files
for (FileStatus status : statuses) {
- Path filePath = status.getPath();
- long modificationTime = status.getModificationTime();
- if (!shouldIgnore(filePath, modificationTime)) {
- files.put(filePath, status);
+ if (!status.isDir()) {
+ Path filePath = status.getPath();
+ long modificationTime = status.getModificationTime();
+ if (!shouldIgnore(filePath, modificationTime)) {
+ files.put(filePath, status);
+ }
+ } else if (format.getNestedFileEnumeration() && format.acceptFile(status)){
+ files.putAll(listEligibleFiles(fileSystem, status.getPath()));
}
}
return files;