You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/24 08:05:46 UTC
[2/4] incubator-apex-malhar git commit: discoveredFiles getting empty
between scan and scan iteration completion
discoveredFiles getting empty between scan and scan iteration completion
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/f99696af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/f99696af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/f99696af
Branch: refs/heads/devel-3
Commit: f99696aff2e46cd50d3060480d4d3018ac65e3e1
Parents: ea202da
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 22 23:01:36 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 23 17:48:15 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/FileSplitterInput.java | 43 ++++++++++++--------
.../lib/io/fs/FileSplitterInputTest.java | 8 ++--
2 files changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index a381be5..2560191 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -176,9 +176,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected boolean processFileInfo(FileInfo fileInfo)
{
ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
+ if (scannedFileInfo == TimeBasedDirectoryScanner.DELIMITER) {
+ return false;
+ }
currentWindowRecoveryState.add(scannedFileInfo);
updateReferenceTimes(scannedFileInfo);
- return super.processFileInfo(fileInfo) && !scannedFileInfo.lastFileOfScan;
+ return super.processFileInfo(fileInfo);
}
protected void updateReferenceTimes(ScannedFileInfo fileInfo)
@@ -255,6 +258,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext>
{
private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
+ private static ScannedFileInfo DELIMITER = new ScannedFileInfo();
private boolean recursive;
@@ -281,6 +285,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected transient Map<String, Long> referenceTimes;
private transient ScannedFileInfo lastScannedInfo;
+ private transient int numDiscoveredPerIteration;
public TimeBasedDirectoryScanner()
{
@@ -339,10 +344,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
if ((trigger || (System.currentTimeMillis() - scanIntervalMillis >= lastScanMillis)) &&
(lastScannedInfo == null || referenceTimes.get(lastScannedInfo.getFilePath()) != null)) {
trigger = false;
+ lastScannedInfo = null;
+ numDiscoveredPerIteration = 0;
for (String afile : files) {
scan(new Path(afile), null);
}
- scanComplete();
+ scanIterationComplete();
} else {
Thread.sleep(sleepMillis);
}
@@ -358,13 +365,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
/**
* Operations that need to be done once a scan is complete.
*/
- protected void scanComplete()
+ protected void scanIterationComplete()
{
- LOG.debug("scan complete {}", lastScanMillis);
- ScannedFileInfo fileInfo = discoveredFiles.peekLast();
- if (fileInfo != null) {
- fileInfo.lastFileOfScan = true;
- lastScannedInfo = fileInfo;
+ LOG.debug("scan complete {} {}", lastScanMillis, numDiscoveredPerIteration);
+ if (numDiscoveredPerIteration > 0) {
+ discoveredFiles.add(DELIMITER);
}
lastScanMillis = System.currentTimeMillis();
}
@@ -398,7 +403,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
if (acceptFile(childPathStr)) {
LOG.debug("found {}", childPathStr);
- discoveredFiles.add(info);
+ processDiscoveredFile(info);
} else {
// don't look at it again
ignoredFiles.add(childPathStr);
@@ -411,6 +416,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
}
+ protected void processDiscoveredFile(ScannedFileInfo info)
+ {
+ numDiscoveredPerIteration++;
+ lastScannedInfo = info;
+ discoveredFiles.add(info);
+ }
+
protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
{
ScannedFileInfo info;
@@ -462,6 +474,11 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
return discoveredFiles.poll();
}
+ protected int getNumDiscoveredPerIteration()
+ {
+ return numDiscoveredPerIteration;
+ }
+
/**
* Gets the regular expression for file names to split.
*
@@ -570,9 +587,8 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo
{
protected final long modifiedTime;
- private transient boolean lastFileOfScan;
- private ScannedFileInfo()
+ protected ScannedFileInfo()
{
super();
modifiedTime = -1;
@@ -583,11 +599,6 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
super(directoryPath, relativeFilePath);
this.modifiedTime = modifiedTime;
}
-
- protected boolean isLastFileOfScan()
- {
- return lastFileOfScan;
- }
}
private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/f99696af/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 8dfea7a..1e2a25c 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -471,14 +471,12 @@ public class FileSplitterInputTest
}
@Override
- protected void scanComplete()
+ protected void scanIterationComplete()
{
- super.scanComplete();
- if (discoveredFiles.size() > 0 && discoveredFiles.getLast().isLastFileOfScan()) {
+ if (getNumDiscoveredPerIteration() > 0) {
semaphore.release();
- LOG.debug("discovered {}", discoveredFiles.size());
}
-
+ super.scanIterationComplete();
}
}