You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2017/01/03 14:01:36 UTC

apex-malhar git commit: APEXMALHAR-2374 Calling scan recursively if the current path is a directory; test case for multi-level directories with recursive flag

Repository: apex-malhar
Updated Branches:
  refs/heads/master d25dcf079 -> 70154f641


APEXMALHAR-2374 Calling scan recursively if the current path is a directory; test case for multi-level directories with recursive flag


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/70154f64
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/70154f64
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/70154f64

Branch: refs/heads/master
Commit: 70154f641be38dd07512e7c45bb0e67254af6fb5
Parents: d25dcf0
Author: francisf <fr...@gmail.com>
Authored: Wed Dec 21 17:08:24 2016 +0530
Committer: francisf <fr...@gmail.com>
Committed: Tue Jan 3 14:09:13 2017 +0530

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    | 28 +++++++++++++++++++-
 .../io/fs/AbstractFileInputOperatorTest.java    | 23 ++++++++++++++--
 2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70154f64/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 0f3cc48..be156d1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -1006,6 +1006,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
     private static final long serialVersionUID = 4535844463258899929L;
     private String filePatternRegexp;
     private transient Pattern regex = null;
+    private boolean recursive = true;
     private int partitionIndex;
     private int partitionCount;
     protected final transient HashSet<String> ignoredFiles = new HashSet<String>();
@@ -1057,7 +1058,12 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
             continue;
           }
 
-          if (acceptFile(filePathStr)) {
+          if (status.isDirectory() ) {
+            if (isRecursive()) {
+              LinkedHashSet<Path> childPathSet = scan(fs, path, consumedFiles);
+              pathSet.addAll(childPathSet);
+            }
+          } else if (acceptFile(filePathStr)) {
             LOG.debug("Found {}", filePathStr);
             pathSet.add(path);
           } else {
@@ -1143,6 +1149,26 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
     {
       this.partitionCount = partitionCount;
     }
+
+    /**
+     * True if recursive; false otherwise.
+     *
+     * @param recursive true if recursive; false otherwise.
+     */
+    public boolean isRecursive()
+    {
+      return recursive;
+    }
+
+    /**
+     * Sets whether scan will be recursive.
+     *
+     * @return true if recursive; false otherwise.
+     */
+    public void setRecursive(boolean recursive)
+    {
+      this.recursive = recursive;
+    }
   }
 
   protected static class RecoveryEntry

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/70154f64/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index e9346ec..1a97f5e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -96,17 +96,30 @@ public class AbstractFileInputOperatorTest
   public TestMeta testMeta = new TestMeta();
 
   @Test
+  public void testSinglePartitonRecursive() throws Exception
+  {
+    checkSubDir(true);
+  }
+
+  @Test
   public void testSinglePartiton() throws Exception
   {
+    checkSubDir(false);
+  }
+
+  private void checkSubDir(boolean recursive) throws Exception
+  {
     FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
     HashSet<String> allLines = Sets.newHashSet();
+    String subdir = "";
     for (int file = 0; file < 2; file++) {
+      subdir += String.format("/depth_%d", file);
       HashSet<String> lines = Sets.newHashSet();
       for (int line = 0; line < 2; line++) {
         lines.add("f" + file + "l" + line);
       }
       allLines.addAll(lines);
-      FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
+      FileUtils.write(new File(testMeta.dir + subdir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
     LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
@@ -118,6 +131,7 @@ public class AbstractFileInputOperatorTest
 
     oper.setDirectory(testMeta.dir);
     oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+    oper.getScanner().setRecursive(recursive);
 
     oper.setup(testMeta.context);
     for (long wid = 0; wid < 3; wid++) {
@@ -127,7 +141,12 @@ public class AbstractFileInputOperatorTest
     }
     oper.teardown();
 
-    Assert.assertEquals("number tuples", 4, queryResults.collectedTuples.size());
+    int expectedNumTuples = 4;
+    if (!recursive) {
+      allLines = new HashSet<String>();
+      expectedNumTuples = 0;
+    }
+    Assert.assertEquals("number tuples", expectedNumTuples, queryResults.collectedTuples.size());
     Assert.assertEquals("lines", allLines, new HashSet<String>(queryResults.collectedTuples));
 
   }