You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/09 17:10:25 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1841 #comment rotating all files irrespective of the state of stream

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 d910102d7 -> 59f21fb5d


MLHR-1841 #comment rotating all files irrespective of the state of stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b8a10bc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b8a10bc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b8a10bc5

Branch: refs/heads/devel-3
Commit: b8a10bc5381bd8a004a2af4f5c0812e379c82d44
Parents: d910102
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 8 14:21:04 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Sep 8 14:52:34 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   |  4 +--
 .../io/fs/AbstractFileOutputOperatorTest.java   | 38 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index aef0739..8339cc1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -868,9 +868,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       if (++rotationCount == rotationWindows) {
         rotationCount = 0;
         // Rotate the files
-        Iterator<String> iterator = streamsCache.asMap().keySet().iterator();
+        Iterator<Map.Entry<String, MutableInt>> iterator = openPart.entrySet().iterator();
         while (iterator.hasNext()) {
-          String filename = iterator.next();
+          String filename = iterator.next().getKey();
           // Rotate the file if the following conditions are met
           // 1. The file is not already rotated during this period for other reasons such as max length is reached
           //     or rotate was explicitly called externally

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index b900af2..dd6bb1d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1809,6 +1809,44 @@ public class AbstractFileOutputOperatorTest
   }
 
   @Test
+  public void testPeriodicRotationWithEviction() throws InterruptedException
+  {
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    File dir = new File(testMeta.getDir());
+    writer.setFilePath(testMeta.getDir());
+    writer.setRotationWindows(30);
+    writer.setAlwaysWriteToTmp(true);
+    writer.setExpireStreamAfterAccessMillis(1L);
+    writer.setup(testMeta.testOperatorContext);
+
+    // Check that rotation for even.txt.0 happens
+    for (int i = 0; i < 30; ++i) {
+      writer.beginWindow(i);
+      if (i == 0) {
+        writer.input.put(i);
+      }
+      Thread.sleep(100L);
+      writer.endWindow();
+    }
+    writer.committed(29);
+    Set<String> fileNames = new TreeSet<>();
+    fileNames.add(EVEN_FILE + ".0");
+    Collection<File> files = FileUtils.listFiles(dir, null, false);
+    Assert.assertEquals("Number of part files", 1, files.size());
+    Assert.assertEquals("Part file names", fileNames, getFileNames(files));
+
+    // Check that rotation doesn't happen for files that don't have data during the rotation period
+    for (int i = 30; i < 120; ++i) {
+      writer.beginWindow(i);
+      writer.endWindow();
+    }
+    writer.committed(119);
+    files = FileUtils.listFiles(dir, null, false);
+    Assert.assertEquals("Number of part files", 1, files.size());
+    Assert.assertEquals("Part file names", fileNames, getFileNames(files));
+  }
+
+  @Test
   public void testCompression() throws IOException
   {
     EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1841' of github.com:chandnisingh/incubator-apex-malhar into devel-3

Posted by pr...@apache.org.
Merge branch 'MLHR-1841' of github.com:chandnisingh/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/59f21fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/59f21fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/59f21fb5

Branch: refs/heads/devel-3
Commit: 59f21fb5dd15264462437fd2446f3ae64fe892a4
Parents: d910102 b8a10bc
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Sep 9 07:50:36 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Sep 9 07:50:36 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   |  4 +--
 .../io/fs/AbstractFileOutputOperatorTest.java   | 38 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------