You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/05/15 07:07:56 UTC

apex-malhar git commit: APEXMALHAR-2447 Added notfication in AbstractFileInputOperator, when scanning the directory for first time. Implementations can use these information in different ways such as notifying downstream operators that a new directory ha

Repository: apex-malhar
Updated Branches:
  refs/heads/master c128091c7 -> 3a3629841


APEXMALHAR-2447 Added notfication in AbstractFileInputOperator, when scanning the directory for first time. Implementations can use these information in different ways such as notifying downstream operators that a new directory has been scanned.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3a362984
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3a362984
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3a362984

Branch: refs/heads/master
Commit: 3a362984143dde0a89fb9538a704ed1f444f0204
Parents: c128091
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Thu Mar 16 08:27:39 2017 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun May 14 21:41:06 2017 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    | 40 +++++++++--
 .../io/fs/AbstractFileInputOperatorTest.java    | 75 ++++++++++++++++++--
 2 files changed, 105 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3a362984/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index bc5ebf1..5c34546 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Serializable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -710,6 +711,23 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
   }
 
   /**
+   * Notifies that the directory is being scanned.<br>
+   * Override this method to custom handling. Will be called once
+   */
+  protected void visitDirectory(Path filePath)
+  {
+  }
+
+  private void checkVisitedDirectory(Path path)
+  {
+    String pathString = path.toString();
+    if (!processedFiles.contains(pathString)) {
+      visitDirectory(path);
+      processedFiles.add(pathString);
+    }
+  }
+
+  /**
    * Scans the directory for new files.
    */
   protected void scanDirectory()
@@ -718,10 +736,19 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
       Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
 
       for (Path newPath : newPaths) {
-        String newPathString = newPath.toString();
-        pendingFiles.add(newPathString);
-        processedFiles.add(newPathString);
-        localProcessedFileCount.increment();
+        try {
+          FileStatus fileStatus = fs.getFileStatus(newPath);
+          if (fileStatus.isDirectory())  {
+            checkVisitedDirectory(newPath);
+          } else {
+            String newPathString = newPath.toString();
+            pendingFiles.add(newPathString);
+            processedFiles.add(newPathString);
+            localProcessedFileCount.increment();
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
       }
 
       lastScanMillis = System.currentTimeMillis();
@@ -1059,6 +1086,11 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
       LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet();
       try {
         LOG.debug("Scanning {} with pattern {}", filePath, this.filePatternRegexp);
+        if (!consumedFiles.contains(filePath.toString())) {
+          if (fs.isDirectory(filePath)) {
+            pathSet.add(filePath);
+          }
+        }
         FileStatus[] files = fs.listStatus(filePath);
         for (FileStatus status : files) {
           Path path = status.getPath();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3a362984/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index b9cdd67..8acd16a 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -20,7 +20,6 @@ package com.datatorrent.lib.io.fs;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -59,7 +58,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Partitioner.Partition;
 import com.datatorrent.api.StatsListener;
-
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
 import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -152,6 +150,71 @@ public class AbstractFileInputOperatorTest
 
   }
 
+  public static class LineOperator extends LineByLineFileInputOperator
+  {
+    Set<String> dirPaths = Sets.newHashSet();
+
+    @Override
+    protected void visitDirectory(Path filePath)
+    {
+      dirPaths.add(Path.getPathWithoutSchemeAndAuthority(filePath).toString());
+    }
+  }
+
+  @Test
+  public void testEmptyDirectory() throws Exception
+  {
+    FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+    Set<String> dPaths = Sets.newHashSet();
+    dPaths.add(new File(testMeta.dir).getCanonicalPath());
+
+    String subdir01 = "/a";
+    dPaths.add(new File(testMeta.dir + subdir01).getCanonicalPath());
+    FileUtils.forceMkdir((new File(testMeta.dir + subdir01)));
+
+    String subdir02 = "/b";
+    dPaths.add(new File(testMeta.dir + subdir02).getCanonicalPath());
+    FileUtils.forceMkdir(new File(testMeta.dir + subdir02));
+
+    String subdir03 = subdir02 + "/c";
+    dPaths.add(new File(testMeta.dir + subdir03).getCanonicalPath());
+    FileUtils.forceMkdir(new File(testMeta.dir + subdir03));
+
+    String subdir04 = "/d";
+    List<String> allLines = Lists.newArrayList();
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 5; line++) {
+      lines.add("f0" + "l" + line);
+    }
+    allLines.addAll(lines);
+    File testFile = new File(testMeta.dir + subdir04, "file0");
+    dPaths.add(new File(testMeta.dir + subdir04).getCanonicalPath());
+    FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+    LineOperator oper = new LineOperator();
+    oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
+    oper.setScanIntervalMillis(0);
+
+    CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
+    oper.output.setSink(sink);
+
+    int wid = 0;
+
+    // Read all records to populate processedList in operator.
+    oper.setup(testMeta.context);
+    for (int i = 0; i < 3; i++) {
+      oper.beginWindow(wid);
+      oper.emitTuples();
+      oper.endWindow();
+      wid++;
+    }
+
+    Assert.assertEquals("Size", 5, oper.dirPaths.size());
+    Assert.assertTrue("Checking Sets", dPaths.equals(oper.dirPaths));
+  }
+
   @Test
   public void testScannerPartitioning() throws Exception
   {
@@ -169,10 +232,10 @@ public class AbstractFileInputOperatorTest
     Set<Path> allFiles = Sets.newHashSet();
     for (DirectoryScanner partition : partitions) {
       Set<Path> files = partition.scan(fs, path, Sets.<String>newHashSet());
-      Assert.assertEquals("", 2, files.size());
+      Assert.assertEquals("", 3, files.size());
       allFiles.addAll(files);
     }
-    Assert.assertEquals("Found all files " + allFiles, 4, allFiles.size());
+    Assert.assertEquals("Found all files " + allFiles, 5, allFiles.size());
 
   }
 
@@ -201,7 +264,7 @@ public class AbstractFileInputOperatorTest
       Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner());
       Set<String> consumed = Sets.newHashSet();
       LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed);
-      Assert.assertEquals("partition " + files, 2, files.size());
+      Assert.assertEquals("partition " + files, 3, files.size());
     }
   }
 
@@ -1109,7 +1172,7 @@ public class AbstractFileInputOperatorTest
       Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner());
       Set<String> consumed = Sets.newHashSet();
       LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed);
-      Assert.assertEquals("partition " + files, 5, files.size());
+      Assert.assertEquals("partition " + files, 6, files.size());
     }
   }