You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/05/27 12:36:46 UTC

flink git commit: [core] cleanup & tests for FileInputFormat

Repository: flink
Updated Branches:
  refs/heads/master 7ff675a34 -> d1d7044ce


[core] cleanup & tests for FileInputFormat

followup of f2891ab857e00bc70eb025bb430f46f4f58355a5

This closes #732.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1d7044c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1d7044c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1d7044c

Branch: refs/heads/master
Commit: d1d7044cee85c97ddea232ef2a70b2d89f40ffac
Parents: 7ff675a
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue May 26 19:59:23 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed May 27 12:35:57 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/io/FileInputFormat.java    | 65 ++++----------------
 .../api/common/io/EnumerateNestedFilesTest.java | 56 +++++++++++++++++
 2 files changed, 69 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1d7044c/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index efa5631..37739f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -326,35 +326,23 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
 		
 		// get the file info and check whether the cached statistics are still valid.
 		final FileStatus file = fs.getFileStatus(filePath);
-		long latestModTime = file.getModificationTime();
 		long totalLength = 0;
 
-		// enumerate all files and check their modification time stamp.
+		// enumerate all files
 		if (file.isDir()) {
-			FileStatus[] fss = fs.listStatus(filePath);
-			files.ensureCapacity(fss.length);
-			
-			for (FileStatus s : fss) {
-				if (!s.isDir()) {
-					if (acceptFile(s)) {
-						files.add(s);
-						totalLength += s.getLen();
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-						testForUnsplittable(s);
-					}
-				}
-				else {
-					if (enumerateNestedFiles && acceptFile(s)) {
-						totalLength += addNestedFiles(s.getPath(), files, 0, false);
-					}
-				}
-			}
+			totalLength += addFilesInDir(file.getPath(), files, totalLength, false);
 		} else {
 			files.add(file);
 			testForUnsplittable(file);
 			totalLength += file.getLen();
 		}
 
+		// check the modification time stamp
+		long latestModTime = 0;
+		for (FileStatus f : files) {
+			latestModTime = Math.max(f.getModificationTime(), latestModTime);
+		}
+
 		// check whether the cached statistics are still valid, if we have any
 		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
 			return cachedStats;
@@ -402,33 +390,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
 		final FileStatus pathFile = fs.getFileStatus(path);
 
 		if (pathFile.isDir()) {
-			// input is directory. list all contained files
-			final FileStatus[] dir = fs.listStatus(path);
-			for (int i = 0; i < dir.length; i++) {
-				if (dir[i].isDir()) {
-					if (enumerateNestedFiles) {
-						if(acceptFile(dir[i])) {
-							totalLength += addNestedFiles(dir[i].getPath(), files, 0, true);
-						} else {
-							if (LOG.isDebugEnabled()) {
-								LOG.debug("Directory "+dir[i].getPath().toString()+" did not pass the file-filter and is excluded.");
-							}
-						}
-					}
-				}
-				else {
-					if (acceptFile(dir[i])) {
-						files.add(dir[i]);
-						totalLength += dir[i].getLen();
-						// as soon as there is one deflate file in a directory, we can not split it
-						testForUnsplittable(dir[i]);
-					} else {
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("File "+dir[i].getPath().toString()+" did not pass the file-filter and is excluded.");
-						}
-					}
-				}
-			}
+			totalLength += addFilesInDir(path, files, totalLength, true);
 		} else {
 			testForUnsplittable(pathFile);
 
@@ -532,18 +494,17 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
 	}
 
 	/**
-	 * Recursively traverse the input directory structure
-	 * and enumerate all accepted nested files.
+	 * Enumerate all files in the directory and recursive if enumerateNestedFiles is true.
 	 * @return the total length of accepted files.
 	 */
-	private long addNestedFiles(Path path, List<FileStatus> files, long length, boolean logExcludedFiles)
+	private long addFilesInDir(Path path, List<FileStatus> files, long length, boolean logExcludedFiles)
 			throws IOException {
 		final FileSystem fs = path.getFileSystem();
 
 		for(FileStatus dir: fs.listStatus(path)) {
 			if (dir.isDir()) {
-				if (acceptFile(dir)) {
-					length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);
+				if (acceptFile(dir) && enumerateNestedFiles) {
+					length += addFilesInDir(dir.getPath(), files, length, logExcludedFiles);
 				} else {
 					if (logExcludedFiles && LOG.isDebugEnabled()) {
 						LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");

http://git-wip-us.apache.org/repos/asf/flink/blob/d1d7044c/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 1fbf40d..da4b0f1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -188,6 +188,53 @@ public class EnumerateNestedFilesTest {
 	}
 
 	/**
+	 * Tests if the recursion is invoked correctly in nested directories.
+	 */
+	@Test
+	public void testOnlyLevel2NestedDirectories() {
+		try {
+			String rootDir = TestFileUtils.randomFileName();
+			String nestedDir = TestFileUtils.randomFileName();
+			String firstNestedNestedDir = TestFileUtils.randomFileName();
+			String secondNestedNestedDir = TestFileUtils.randomFileName();
+
+			File testDir = new File(tempPath + System.getProperty("file.separator") + rootDir);
+			testDir.mkdirs();
+			testDir.deleteOnExit();
+
+			File nested = new File(testDir.getAbsolutePath() + System.getProperty("file.separator") + nestedDir);
+			nested.mkdirs();
+			nested.deleteOnExit();
+
+			File nestedNestedDir1 = new File(nested.getAbsolutePath() + System.getProperty("file.separator")
+					+ firstNestedNestedDir);
+			nestedNestedDir1.mkdirs();
+			nestedNestedDir1.deleteOnExit();
+
+			File nestedNestedDir2 = new File(nested.getAbsolutePath() + System.getProperty("file.separator")
+					+ secondNestedNestedDir);
+			nestedNestedDir2.mkdirs();
+			nestedNestedDir2.deleteOnExit();
+
+			// create files in second level
+			TestFileUtils.createTempFileInDirectory(nestedNestedDir1.getAbsolutePath(), "paella");
+			TestFileUtils.createTempFileInDirectory(nestedNestedDir1.getAbsolutePath(), "kalamari");
+			TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "fideua");
+			TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "bravas");
+
+			this.format.setFilePath(new Path(testDir.getAbsolutePath()));
+			this.config.setBoolean("recursive.file.enumeration", true);
+			format.configure(this.config);
+
+			FileInputSplit[] splits = format.createInputSplits(1);
+			Assert.assertEquals(4, splits.length);
+		} catch (Exception ex) {
+			ex.printStackTrace();
+			Assert.fail(ex.getMessage());
+		}
+	}
+
+	/**
 	 * Test with two nested directories and recursive.file.enumeration = true
 	 */
 	@Test
@@ -309,6 +356,15 @@ public class EnumerateNestedFilesTest {
 
 			BaseStatistics stats = format.getStatistics(null);
 			Assert.assertEquals("The file size from the statistics is wrong.", TOTAL, stats.getTotalInputSize());
+
+			/* Now invalidate the cache and check again */
+			Thread.sleep(1000); // accuracy of file modification times is rather low
+			TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), 42L);
+
+			BaseStatistics stats2 = format.getStatistics(stats);
+			Assert.assertNotEquals(stats2, stats);
+			Assert.assertEquals("The file size from the statistics is wrong.", TOTAL + 42L, stats2.getTotalInputSize());
+
 		} catch (Exception ex) {
 			ex.printStackTrace();
 			Assert.fail(ex.getMessage());