You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:39:23 UTC

[02/17] flink git commit: [FLINK-5432] recursively scan nested files in ContinuousFileMonitoringFunction

[FLINK-5432] recursively scan nested files in ContinuousFileMonitoringFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c18e22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c18e22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c18e22

Branch: refs/heads/release-1.2
Commit: 28c18e22127a85f773e7504a0e9d188bad9334e2
Parents: 79b6826
Author: Yassine Marzougui <y....@mindlytix.com>
Authored: Wed Jan 11 01:43:19 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 21:41:59 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FileInputFormat.java    |  2 +-
 .../hdfstests/ContinuousFileProcessingTest.java | 53 ++++++++++++++++++++
 .../ContinuousFileMonitoringFunction.java       | 18 ++++---
 3 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28c18e22/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 1d092af..785fb3b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -637,7 +637,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	 * @param fileStatus The file status to check.
 	 * @return true, if the given file or directory is accepted
 	 */
-	protected boolean acceptFile(FileStatus fileStatus) {
+	public boolean acceptFile(FileStatus fileStatus) {
 		final String name = fileStatus.getPath().getName();
 		return !name.startsWith("_")
 			&& !name.startsWith(".")

http://git-wip-us.apache.org/repos/asf/flink/blob/28c18e22/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 0cb1bad..c29dd27 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -588,6 +588,59 @@ public class ContinuousFileProcessingTest {
 	}
 
 	@Test
+	public void testNestedFilesProcessing() throws Exception {
+		final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		final Set<String> filesToBeRead = new TreeSet<>();
+
+		// create two nested directories
+		org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir");
+		org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir" + "/" + "secondLevelDir");
+		Assert.assertFalse(hdfs.exists(firstLevelDir));
+		hdfs.mkdirs(firstLevelDir);
+		hdfs.mkdirs(secondLevelDir);
+
+		// create files in the base dir, the first level dir and the second level dir
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "firstLevelFile", i, "This is test line.");
+			filesCreated.add(file.f0);
+			filesToBeRead.add(file.f0.getName());
+		}
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(firstLevelDir.toString(), "secondLevelFile", i, "This is test line.");
+			filesCreated.add(file.f0);
+			filesToBeRead.add(file.f0.getName());
+		}
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(secondLevelDir.toString(), "thirdLevelFile", i, "This is test line.");
+			filesCreated.add(file.f0);
+			filesToBeRead.add(file.f0.getName());
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilesFilter(FilePathFilter.createDefaultFilter());
+		format.setNestedFileEnumeration(true);
+
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format,
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		final FileVerifyingSourceContext context =
+			new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(context);
+
+		Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray());
+
+		// finally delete the dirs and the files created for the test.
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+		hdfs.delete(secondLevelDir, false);
+		hdfs.delete(firstLevelDir, false);
+	}
+
+	@Test
 	public void testSortingOnModTime() throws Exception {
 		final long[] modTimes = new long[NO_OF_FILES];
 		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];

http://git-wip-us.apache.org/repos/asf/flink/blob/28c18e22/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index e0a042a..589e285 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -232,7 +232,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 											SourceContext<TimestampedFileInputSplit> context) throws IOException {
 		assert (Thread.holdsLock(checkpointLock));
 
-		Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs);
+		Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
 		Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);
 
 		for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
@@ -282,11 +282,11 @@ public class ContinuousFileMonitoringFunction<OUT>
 	 * Returns the paths of the files not yet processed.
 	 * @param fileSystem The filesystem where the monitored directory resides.
 	 */
-	private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
+	private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) throws IOException {
 
 		final FileStatus[] statuses;
 		try {
-			statuses = fileSystem.listStatus(new Path(path));
+			statuses = fileSystem.listStatus(path);
 		} catch (IOException e) {
 			// we may run into an IOException if files are moved while listing their status
 			// delay the check for eligible files in this case
@@ -300,10 +300,14 @@ public class ContinuousFileMonitoringFunction<OUT>
 			Map<Path, FileStatus> files = new HashMap<>();
 			// handle the new files
 			for (FileStatus status : statuses) {
-				Path filePath = status.getPath();
-				long modificationTime = status.getModificationTime();
-				if (!shouldIgnore(filePath, modificationTime)) {
-					files.put(filePath, status);
+				if (!status.isDir()) {
+					Path filePath = status.getPath();
+					long modificationTime = status.getModificationTime();
+					if (!shouldIgnore(filePath, modificationTime)) {
+						files.put(filePath, status);
+					}
+				} else if (format.getNestedFileEnumeration() && format.acceptFile(status)){
+					files.putAll(listEligibleFiles(fileSystem, status.getPath()));
 				}
 			}
 			return files;