You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/24 08:05:46 UTC

[2/4] incubator-apex-malhar git commit: discoveredFiles getting empty between scan and scan iteration completion

discoveredFiles getting empty between scan and scan iteration completion


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f99696af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f99696af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f99696af

Branch: refs/heads/devel-3
Commit: f99696aff2e46cd50d3060480d4d3018ac65e3e1
Parents: ea202da
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 22 23:01:36 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 23 17:48:15 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            | 43 ++++++++++++--------
 .../lib/io/fs/FileSplitterInputTest.java        |  8 ++--
 2 files changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index a381be5..2560191 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -176,9 +176,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   protected boolean processFileInfo(FileInfo fileInfo)
   {
     ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
+    if (scannedFileInfo ==  TimeBasedDirectoryScanner.DELIMITER) {
+      return false;
+    }
     currentWindowRecoveryState.add(scannedFileInfo);
     updateReferenceTimes(scannedFileInfo);
-    return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan;
+    return super.processFileInfo(fileInfo);
   }
 
   protected void updateReferenceTimes(ScannedFileInfo fileInfo)
@@ -255,6 +258,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
   {
     private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+    private static ScannedFileInfo DELIMITER = new ScannedFileInfo();
 
     private boolean recursive;
 
@@ -281,6 +285,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     protected transient Map<String, Long> referenceTimes;
 
     private transient ScannedFileInfo lastScannedInfo;
+    private transient int numDiscoveredPerIteration;
 
     public TimeBasedDirectoryScanner()
     {
@@ -339,10 +344,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
           if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) &&
             (lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) {
             trigger = false;
+            lastScannedInfo = null;
+            numDiscoveredPerIteration = 0;
             for (String afile : files) {
               scan(new Path(afile), null);
             }
-            scanComplete();
+            scanIterationComplete();
           } else {
             Thread.sleep(sleepMillis);
           }
@@ -358,13 +365,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     /**
      * Operations that need to be done once a scan is complete.
      */
-    protected void scanComplete()
+    protected void scanIterationComplete()
     {
-      LOG.debug("scan complete {}", lastScanMillis);
-      ScannedFileInfo fileInfo = discoveredFiles.peekLast();
-      if (fileInfo != null) {
-        fileInfo.lastFileOfScan = true;
-        lastScannedInfo = fileInfo;
+      LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration);
+      if (numDiscoveredPerIteration > 0) {
+        discoveredFiles.add(DELIMITER);
       }
       lastScanMillis = System.currentTimeMillis();
     }
@@ -398,7 +403,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
           }
           if (acceptFile(childPathStr)) {
             LOG.debug("found {}", childPathStr);
-            discoveredFiles.add(info);
+            processDiscoveredFile(info);
           } else {
             // don't look at it again
             ignoredFiles.add(childPathStr);
@@ -411,6 +416,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       }
     }
 
+    protected void processDiscoveredFile(ScannedFileInfo info)
+    {
+      numDiscoveredPerIteration++;
+      lastScannedInfo = info;
+      discoveredFiles.add(info);
+    }
+
     protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
     {
       ScannedFileInfo info;
@@ -462,6 +474,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       return discoveredFiles.poll();
     }
 
+    protected int getNumDiscoveredPerIteration()
+    {
+      return numDiscoveredPerIteration;
+    }
+
     /**
      * Gets the regular expression for file names to split.
      *
@@ -570,9 +587,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo
   {
     protected final long modifiedTime;
-    private transient boolean lastFileOfScan;
 
-    private ScannedFileInfo()
+    protected ScannedFileInfo()
     {
       super();
       modifiedTime = -1;
@@ -583,11 +599,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       super(directoryPath, relativeFilePath);
       this.modifiedTime = modifiedTime;
     }
-
-    protected boolean isLastFileOfScan()
-    {
-      return lastFileOfScan;
-    }
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 8dfea7a..1e2a25c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -471,14 +471,12 @@ public class FileSplitterInputTest
     }
 
     @Override
-    protected void scanComplete()
+    protected void scanIterationComplete()
     {
-      super.scanComplete();
-      if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) {
+      if (getNumDiscoveredPerIteration() > 0) {
         semaphore.release();
-        LOG.debug("discovered {}", discoveredFiles.size());
       }
-
+      super.scanIterationComplete();
     }
   }