You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/09 17:10:25 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1841 #comment rotating
all files irrespective of the state of stream
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 d910102d7 -> 59f21fb5d
MLHR-1841 #comment rotating all files irrespective of the state of stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b8a10bc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b8a10bc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b8a10bc5
Branch: refs/heads/devel-3
Commit: b8a10bc5381bd8a004a2af4f5c0812e379c82d44
Parents: d910102
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 8 14:21:04 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Sep 8 14:52:34 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 4 +--
.../io/fs/AbstractFileOutputOperatorTest.java | 38 ++++++++++++++++++++
2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index aef0739..8339cc1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -868,9 +868,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
if (++rotationCount == rotationWindows) {
rotationCount = 0;
// Rotate the files
- Iterator<String> iterator = streamsCache.asMap().keySet().iterator();
+ Iterator<Map.Entry<String, MutableInt>> iterator = openPart.entrySet().iterator();
while (iterator.hasNext()) {
- String filename = iterator.next();
+ String filename = iterator.next().getKey();
// Rotate the file if the following conditions are met
// 1. The file is not already rotated during this period for other reasons such as max length is reached
// or rotate was explicitly called externally
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b8a10bc5/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index b900af2..dd6bb1d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1809,6 +1809,44 @@ public class AbstractFileOutputOperatorTest
}
@Test
+ public void testPeriodicRotationWithEviction() throws InterruptedException
+ {
+ EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+ File dir = new File(testMeta.getDir());
+ writer.setFilePath(testMeta.getDir());
+ writer.setRotationWindows(30);
+ writer.setAlwaysWriteToTmp(true);
+ writer.setExpireStreamAfterAccessMillis(1L);
+ writer.setup(testMeta.testOperatorContext);
+
+ // Check that rotation for even.txt.0 happens
+ for (int i = 0; i < 30; ++i) {
+ writer.beginWindow(i);
+ if (i == 0) {
+ writer.input.put(i);
+ }
+ Thread.sleep(100L);
+ writer.endWindow();
+ }
+ writer.committed(29);
+ Set<String> fileNames = new TreeSet<>();
+ fileNames.add(EVEN_FILE + ".0");
+ Collection<File> files = FileUtils.listFiles(dir, null, false);
+ Assert.assertEquals("Number of part files", 1, files.size());
+ Assert.assertEquals("Part file names", fileNames, getFileNames(files));
+
+ // Check that rotation doesn't happen for files that don't have data during the rotation period
+ for (int i = 30; i < 120; ++i) {
+ writer.beginWindow(i);
+ writer.endWindow();
+ }
+ writer.committed(119);
+ files = FileUtils.listFiles(dir, null, false);
+ Assert.assertEquals("Number of part files", 1, files.size());
+ Assert.assertEquals("Part file names", fileNames, getFileNames(files));
+ }
+
+ @Test
public void testCompression() throws IOException
{
EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1841' of
github.com:chandnisingh/incubator-apex-malhar into devel-3
Posted by pr...@apache.org.
Merge branch 'MLHR-1841' of github.com:chandnisingh/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/59f21fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/59f21fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/59f21fb5
Branch: refs/heads/devel-3
Commit: 59f21fb5dd15264462437fd2446f3ae64fe892a4
Parents: d910102 b8a10bc
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Wed Sep 9 07:50:36 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Wed Sep 9 07:50:36 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 4 +--
.../io/fs/AbstractFileOutputOperatorTest.java | 38 ++++++++++++++++++++
2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------