You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2017/01/03 14:01:36 UTC
apex-malhar git commit: APEXMALHAR-2374 Calling scan recursively if
the current path is a directory;
test case for multi-level directories with recursive flag
Repository: apex-malhar
Updated Branches:
refs/heads/master d25dcf079 -> 70154f641
APEXMALHAR-2374 Calling scan recursively if the current path is a directory; test case for multi-level directories with recursive flag
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/70154f64
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/70154f64
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/70154f64
Branch: refs/heads/master
Commit: 70154f641be38dd07512e7c45bb0e67254af6fb5
Parents: d25dcf0
Author: francisf <fr...@gmail.com>
Authored: Wed Dec 21 17:08:24 2016 +0530
Committer: francisf <fr...@gmail.com>
Committed: Tue Jan 3 14:09:13 2017 +0530
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileInputOperator.java | 28 +++++++++++++++++++-
.../io/fs/AbstractFileInputOperatorTest.java | 23 ++++++++++++++--
2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70154f64/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 0f3cc48..be156d1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -1006,6 +1006,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
private static final long serialVersionUID = 4535844463258899929L;
private String filePatternRegexp;
private transient Pattern regex = null;
+ private boolean recursive = true;
private int partitionIndex;
private int partitionCount;
protected final transient HashSet<String> ignoredFiles = new HashSet<String>();
@@ -1057,7 +1058,12 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
continue;
}
- if (acceptFile(filePathStr)) {
+ if (status.isDirectory() ) {
+ if (isRecursive()) {
+ LinkedHashSet<Path> childPathSet = scan(fs, path, consumedFiles);
+ pathSet.addAll(childPathSet);
+ }
+ } else if (acceptFile(filePathStr)) {
LOG.debug("Found {}", filePathStr);
pathSet.add(path);
} else {
@@ -1143,6 +1149,26 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
{
this.partitionCount = partitionCount;
}
+
+ /**
+ * True if recursive; false otherwise.
+ *
+ * @param recursive true if recursive; false otherwise.
+ */
+ public boolean isRecursive()
+ {
+ return recursive;
+ }
+
+ /**
+ * Sets whether scan will be recursive.
+ *
+ * @return true if recursive; false otherwise.
+ */
+ public void setRecursive(boolean recursive)
+ {
+ this.recursive = recursive;
+ }
}
protected static class RecoveryEntry
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70154f64/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index e9346ec..1a97f5e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -96,17 +96,30 @@ public class AbstractFileInputOperatorTest
public TestMeta testMeta = new TestMeta();
@Test
+ public void testSinglePartitonRecursive() throws Exception
+ {
+ checkSubDir(true);
+ }
+
+ @Test
public void testSinglePartiton() throws Exception
{
+ checkSubDir(false);
+ }
+
+ private void checkSubDir(boolean recursive) throws Exception
+ {
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
HashSet<String> allLines = Sets.newHashSet();
+ String subdir = "";
for (int file = 0; file < 2; file++) {
+ subdir += String.format("/depth_%d", file);
HashSet<String> lines = Sets.newHashSet();
for (int line = 0; line < 2; line++) {
lines.add("f" + file + "l" + line);
}
allLines.addAll(lines);
- FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+ FileUtils.write(new File(testMeta.dir + subdir, "file" + file), StringUtils.join(lines, '\n'));
}
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
@@ -118,6 +131,7 @@ public class AbstractFileInputOperatorTest
oper.setDirectory(testMeta.dir);
oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+ oper.getScanner().setRecursive(recursive);
oper.setup(testMeta.context);
for (long wid = 0; wid < 3; wid++) {
@@ -127,7 +141,12 @@ public class AbstractFileInputOperatorTest
}
oper.teardown();
- Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size());
+ int expectedNumTuples = 4;
+ if (!recursive) {
+ allLines = new HashSet<String>();
+ expectedNumTuples = 0;
+ }
+ Assert.assertEquals("number tuples", expectedNumTuples, queryResults.collectedTuples.size());
Assert.assertEquals("lines", allLines, new HashSet<String>(queryResults.collectedTuples));
}