You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/18 06:54:24 UTC
[06/18] incubator-apex-malhar git commit: APEXMALHAR-2004: Add file's
modification time in referenceTimes map instead of parent's
APEXMALHAR-2004: Add file's modification time in referenceTimes map
instead of parent's
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/327a3999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/327a3999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/327a3999
Branch: refs/heads/devel-3
Commit: 327a3999c6c443894bc1e085d91a1f030a848bdd
Parents: d3a7063
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Sat Feb 27 18:34:38 2016 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Fri Mar 11 14:38:34 2016 +0530
----------------------------------------------------------------------
.../lib/io/fs/FileSplitterInput.java | 8 ++--
.../lib/io/fs/FileSplitterInputTest.java | 47 ++++++++++++++++++++
2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index ab70047..234650d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -434,17 +434,17 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
}
protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath,
- @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
+ FileStatus childStatus, Path rootPath)
{
ScannedFileInfo info;
if (rootPath == null) {
info = parentStatus.isDirectory() ?
- new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) :
- new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime());
+ new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), childStatus.getModificationTime()) :
+ new ScannedFileInfo(null, childPath.toUri().getPath(), childStatus.getModificationTime());
} else {
URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(),
- parentStatus.getModificationTime());
+ childStatus.getModificationTime());
}
return info;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cd0de2d..c5d2ae7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.io.fs;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
@@ -498,6 +499,52 @@ public class FileSplitterInputTest
Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
}
+ @Test
+ public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException
+ {
+ testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
+ testFileMetadata();
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+
+ Thread.sleep(1000);
+ //change a file , this will not change mtime of the file.
+ File f12 = new File(testMeta.dataDirectory, "file11" + ".txt");
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 2; line++) {
+ lines.add("f13" + "l" + line);
+ }
+ /* Need to use FileWriter, FileUtils changes the directory timestamp when
+ file is changed. */
+ FileWriter fout = new FileWriter(f12, true);
+ fout.write(StringUtils.join(lines, '\n').toCharArray());
+ fout.close();
+ testMeta.fileSplitterInput.getScanner().setTrigger(true);
+
+ //window 2
+ testMeta.fileSplitterInput.beginWindow(2);
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+
+ //window 3
+ testMeta.fileMetadataSink.clear();
+ testMeta.blockMetadataSink.clear();
+ testMeta.scanner.setTrigger(true);
+ testMeta.scanner.semaphore.release();
+ testMeta.fileSplitterInput.beginWindow(3);
+ Thread.sleep(1000);
+ testMeta.scanner.semaphore.acquire();
+ testMeta.fileSplitterInput.emitTuples();
+ testMeta.fileSplitterInput.endWindow();
+
+ Assert.assertEquals("window 2: files", 0, testMeta.fileMetadataSink.collectedTuples.size());
+ Assert.assertEquals("window 2: blocks", 0, testMeta.blockMetadataSink.collectedTuples.size());
+
+ }
private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
{