You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/11/16 23:07:12 UTC
[1/2] incubator-apex-core git commit: - APEX-263 Fixed case where
double checkpointing could occurr.
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 1873f5562 -> 93bdf2d10
- APEX-263 Fixed case where double checkpointing could occurr.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a4207c56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a4207c56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a4207c56
Branch: refs/heads/devel-3
Commit: a4207c5685822a22020d253c0a00231bd83af2c0
Parents: 1873f55
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sat Nov 14 17:33:34 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Mon Nov 16 12:23:01 2015 -0800
----------------------------------------------------------------------
.../datatorrent/stram/engine/GenericNode.java | 2 +
.../stram/engine/GenericNodeTest.java | 119 ++++++++++++++++++-
2 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a4207c56/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 26ba98a..93cee49 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -156,10 +156,12 @@ public class GenericNode extends Node<Operator>
checkpointWindowCount = 0;
if (doCheckpoint) {
checkpoint(currentWindowId);
+ lastCheckpointWindowId = currentWindowId;
doCheckpoint = false;
}
else if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
checkpoint(currentWindowId);
+ lastCheckpointWindowId = currentWindowId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a4207c56/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index d5ceae6..f2c23b2 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -19,16 +19,22 @@
package com.datatorrent.stram.engine;
import java.util.ArrayList;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Assert;
+import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.ProcessingMode;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -90,6 +96,41 @@ public class GenericNodeTest
}
+ public static class GenericCheckpointOperator extends GenericOperator implements CheckpointListener
+ {
+ public Set<Long> checkpointedWindows = Sets.newHashSet();
+ public volatile boolean checkpointTwice = false;
+ public volatile int numWindows = 0;
+
+ public GenericCheckpointOperator()
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ numWindows++;
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId);
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ }
+ }
+
@Test
@SuppressWarnings("SleepWhileInLoop")
public void testSynchingLogic() throws InterruptedException
@@ -296,4 +337,78 @@ public class GenericNodeTest
Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
}
+ @Test
+ public void testDoubleCheckpointAtleastOnce() throws Exception
+ {
+ testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE);
+ }
+
+ @Test
+ public void testDoubleCheckpointAtMostOnce() throws Exception
+ {
+ testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE);
+ }
+
+ @Test
+ public void testDoubleCheckpointExactlyOnce() throws Exception
+ {
+ testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE);
+ }
+
+ @SuppressWarnings("SleepWhileInLoop")
+ private void testDoubleCheckpointHandling(ProcessingMode processingMode) throws Exception
+ {
+ WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
+ windowGenerator.setResetWindow(0L);
+ windowGenerator.setFirstWindow(0L);
+ windowGenerator.setWindowWidth(100);
+ windowGenerator.setCheckpointCount(1, 0);
+
+ GenericCheckpointOperator gco = new GenericCheckpointOperator();
+ DefaultAttributeMap dam = new DefaultAttributeMap();
+ dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+ dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 2);
+ dam.put(OperatorContext.PROCESSING_MODE, processingMode);
+
+ final GenericNode in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+ in.setId(1);
+
+ TestSink testSink = new TestSink();
+
+ in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
+ in.connectOutputPort("output", testSink);
+
+ windowGenerator.activate(null);
+
+ final AtomicBoolean ab = new AtomicBoolean(false);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ ab.set(true);
+ in.activate();
+ in.run();
+ in.deactivate();
+ }
+
+ };
+
+ t.start();
+
+ long startTime = System.currentTimeMillis();
+ long endTime = 0;
+
+ while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 5000) {
+ Thread.sleep(50);
+ }
+
+ in.shutdown();
+ t.join();
+
+ windowGenerator.deactivate();
+
+ Assert.assertFalse(gco.checkpointTwice);
+ Assert.assertTrue("Timed out", (endTime - startTime) < 5000);
+ }
}
[2/2] incubator-apex-core git commit: Merge branch 'APEX-263' of
github.com:ilooner/incubator-apex-core into devel-3
Posted by pr...@apache.org.
Merge branch 'APEX-263' of github.com:ilooner/incubator-apex-core into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/93bdf2d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/93bdf2d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/93bdf2d1
Branch: refs/heads/devel-3
Commit: 93bdf2d10f1949d58b4ca1dc9391c931aa2612a2
Parents: 1873f55 a4207c5
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Nov 16 13:43:31 2015 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 16 13:43:31 2015 -0800
----------------------------------------------------------------------
.../datatorrent/stram/engine/GenericNode.java | 2 +
.../stram/engine/GenericNodeTest.java | 119 ++++++++++++++++++-
2 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------