You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/03/17 21:30:01 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2004: Add file's modification time in referenceTimes map instead of parent's

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master becee7f82 -> 5373a3cb6


APEXMALHAR-2004: Add file's modification time in referenceTimes map
instead of parent's


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/327a3999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/327a3999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/327a3999

Branch: refs/heads/master
Commit: 327a3999c6c443894bc1e085d91a1f030a848bdd
Parents: d3a7063
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Sat Feb 27 18:34:38 2016 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Fri Mar 11 14:38:34 2016 +0530

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            |  8 ++--
 .../lib/io/fs/FileSplitterInputTest.java        | 47 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index ab70047..234650d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -434,17 +434,17 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
     }
 
     protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath,
-        @SuppressWarnings("UnusedParameters") FileStatus childStatus, Path rootPath)
+        FileStatus childStatus, Path rootPath)
     {
       ScannedFileInfo info;
       if (rootPath == null) {
         info = parentStatus.isDirectory() ?
-          new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), parentStatus.getModificationTime()) :
-          new ScannedFileInfo(null, childPath.toUri().getPath(), parentStatus.getModificationTime());
+          new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), childStatus.getModificationTime()) :
+          new ScannedFileInfo(null, childPath.toUri().getPath(), childStatus.getModificationTime());
       } else {
         URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
         info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(),
-            parentStatus.getModificationTime());
+          childStatus.getModificationTime());
       }
       return info;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/327a3999/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index cd0de2d..c5d2ae7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -20,6 +20,7 @@ package com.datatorrent.lib.io.fs;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -498,6 +499,52 @@ public class FileSplitterInputTest
     Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size());
   }
 
+  @Test
+  public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException
+  {
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
+    testFileMetadata();
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    Thread.sleep(1000);
+    //change a file , this will not change mtime of the file.
+    File f12 = new File(testMeta.dataDirectory, "file11" + ".txt");
+    HashSet<String> lines = Sets.newHashSet();
+    for (int line = 0; line < 2; line++) {
+      lines.add("f13" + "l" + line);
+    }
+    /* Need to use FileWriter, FileUtils changes the directory timestamp when
+       file is changed. */
+    FileWriter fout = new FileWriter(f12, true);
+    fout.write(StringUtils.join(lines, '\n').toCharArray());
+    fout.close();
+    testMeta.fileSplitterInput.getScanner().setTrigger(true);
+
+    //window 2
+    testMeta.fileSplitterInput.beginWindow(2);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size());
+
+    //window 3
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+    testMeta.scanner.setTrigger(true);
+    testMeta.scanner.semaphore.release();
+    testMeta.fileSplitterInput.beginWindow(3);
+    Thread.sleep(1000);
+    testMeta.scanner.semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    Assert.assertEquals("window 2: files", 0, testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("window 2: blocks", 0, testMeta.blockMetadataSink.collectedTuples.size());
+
+  }
 
   private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner
   {


[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2004'

Posted by cs...@apache.org.
Merge branch 'APEXMALHAR-2004'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5373a3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5373a3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5373a3cb

Branch: refs/heads/master
Commit: 5373a3cb65f61704d5f1ce96415a2a5c0193a52e
Parents: becee7f 327a399
Author: Chandni Singh <cs...@apache.org>
Authored: Thu Mar 17 13:29:21 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Thu Mar 17 13:29:21 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            |  8 ++--
 .../lib/io/fs/FileSplitterInputTest.java        | 47 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5373a3cb/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5373a3cb/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------