You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/05/27 12:36:46 UTC
flink git commit: [core] cleanup & tests for FileInputFormat
Repository: flink
Updated Branches:
refs/heads/master 7ff675a34 -> d1d7044ce
[core] cleanup & tests for FileInputFormat
followup of f2891ab857e00bc70eb025bb430f46f4f58355a5
This closes #732.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1d7044c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1d7044c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1d7044c
Branch: refs/heads/master
Commit: d1d7044cee85c97ddea232ef2a70b2d89f40ffac
Parents: 7ff675a
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue May 26 19:59:23 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed May 27 12:35:57 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/io/FileInputFormat.java | 65 ++++----------------
.../api/common/io/EnumerateNestedFilesTest.java | 56 +++++++++++++++++
2 files changed, 69 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d1d7044c/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index efa5631..37739f5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -326,35 +326,23 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
// get the file info and check whether the cached statistics are still valid.
final FileStatus file = fs.getFileStatus(filePath);
- long latestModTime = file.getModificationTime();
long totalLength = 0;
- // enumerate all files and check their modification time stamp.
+ // enumerate all files
if (file.isDir()) {
- FileStatus[] fss = fs.listStatus(filePath);
- files.ensureCapacity(fss.length);
-
- for (FileStatus s : fss) {
- if (!s.isDir()) {
- if (acceptFile(s)) {
- files.add(s);
- totalLength += s.getLen();
- latestModTime = Math.max(s.getModificationTime(), latestModTime);
- testForUnsplittable(s);
- }
- }
- else {
- if (enumerateNestedFiles && acceptFile(s)) {
- totalLength += addNestedFiles(s.getPath(), files, 0, false);
- }
- }
- }
+ totalLength += addFilesInDir(file.getPath(), files, totalLength, false);
} else {
files.add(file);
testForUnsplittable(file);
totalLength += file.getLen();
}
+ // check the modification time stamp
+ long latestModTime = 0;
+ for (FileStatus f : files) {
+ latestModTime = Math.max(f.getModificationTime(), latestModTime);
+ }
+
// check whether the cached statistics are still valid, if we have any
if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
return cachedStats;
@@ -402,33 +390,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
final FileStatus pathFile = fs.getFileStatus(path);
if (pathFile.isDir()) {
- // input is directory. list all contained files
- final FileStatus[] dir = fs.listStatus(path);
- for (int i = 0; i < dir.length; i++) {
- if (dir[i].isDir()) {
- if (enumerateNestedFiles) {
- if(acceptFile(dir[i])) {
- totalLength += addNestedFiles(dir[i].getPath(), files, 0, true);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Directory "+dir[i].getPath().toString()+" did not pass the file-filter and is excluded.");
- }
- }
- }
- }
- else {
- if (acceptFile(dir[i])) {
- files.add(dir[i]);
- totalLength += dir[i].getLen();
- // as soon as there is one deflate file in a directory, we can not split it
- testForUnsplittable(dir[i]);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("File "+dir[i].getPath().toString()+" did not pass the file-filter and is excluded.");
- }
- }
- }
- }
+ totalLength += addFilesInDir(path, files, totalLength, true);
} else {
testForUnsplittable(pathFile);
@@ -532,18 +494,17 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
}
/**
- * Recursively traverse the input directory structure
- * and enumerate all accepted nested files.
+ * Enumerate all files in the directory and recursive if enumerateNestedFiles is true.
* @return the total length of accepted files.
*/
- private long addNestedFiles(Path path, List<FileStatus> files, long length, boolean logExcludedFiles)
+ private long addFilesInDir(Path path, List<FileStatus> files, long length, boolean logExcludedFiles)
throws IOException {
final FileSystem fs = path.getFileSystem();
for(FileStatus dir: fs.listStatus(path)) {
if (dir.isDir()) {
- if (acceptFile(dir)) {
- length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);
+ if (acceptFile(dir) && enumerateNestedFiles) {
+ length += addFilesInDir(dir.getPath(), files, length, logExcludedFiles);
} else {
if (logExcludedFiles && LOG.isDebugEnabled()) {
LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
http://git-wip-us.apache.org/repos/asf/flink/blob/d1d7044c/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 1fbf40d..da4b0f1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -188,6 +188,53 @@ public class EnumerateNestedFilesTest {
}
/**
+ * Tests if the recursion is invoked correctly in nested directories.
+ */
+ @Test
+ public void testOnlyLevel2NestedDirectories() {
+ try {
+ String rootDir = TestFileUtils.randomFileName();
+ String nestedDir = TestFileUtils.randomFileName();
+ String firstNestedNestedDir = TestFileUtils.randomFileName();
+ String secondNestedNestedDir = TestFileUtils.randomFileName();
+
+ File testDir = new File(tempPath + System.getProperty("file.separator") + rootDir);
+ testDir.mkdirs();
+ testDir.deleteOnExit();
+
+ File nested = new File(testDir.getAbsolutePath() + System.getProperty("file.separator") + nestedDir);
+ nested.mkdirs();
+ nested.deleteOnExit();
+
+ File nestedNestedDir1 = new File(nested.getAbsolutePath() + System.getProperty("file.separator")
+ + firstNestedNestedDir);
+ nestedNestedDir1.mkdirs();
+ nestedNestedDir1.deleteOnExit();
+
+ File nestedNestedDir2 = new File(nested.getAbsolutePath() + System.getProperty("file.separator")
+ + secondNestedNestedDir);
+ nestedNestedDir2.mkdirs();
+ nestedNestedDir2.deleteOnExit();
+
+ // create files in second level
+ TestFileUtils.createTempFileInDirectory(nestedNestedDir1.getAbsolutePath(), "paella");
+ TestFileUtils.createTempFileInDirectory(nestedNestedDir1.getAbsolutePath(), "kalamari");
+ TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "fideua");
+ TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "bravas");
+
+ this.format.setFilePath(new Path(testDir.getAbsolutePath()));
+ this.config.setBoolean("recursive.file.enumeration", true);
+ format.configure(this.config);
+
+ FileInputSplit[] splits = format.createInputSplits(1);
+ Assert.assertEquals(4, splits.length);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ Assert.fail(ex.getMessage());
+ }
+ }
+
+ /**
* Test with two nested directories and recursive.file.enumeration = true
*/
@Test
@@ -309,6 +356,15 @@ public class EnumerateNestedFilesTest {
BaseStatistics stats = format.getStatistics(null);
Assert.assertEquals("The file size from the statistics is wrong.", TOTAL, stats.getTotalInputSize());
+
+ /* Now invalidate the cache and check again */
+ Thread.sleep(1000); // accuracy of file modification times is rather low
+ TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), 42L);
+
+ BaseStatistics stats2 = format.getStatistics(stats);
+ Assert.assertNotEquals(stats2, stats);
+ Assert.assertEquals("The file size from the statistics is wrong.", TOTAL + 42L, stats2.getTotalInputSize());
+
} catch (Exception ex) {
ex.printStackTrace();
Assert.fail(ex.getMessage());