You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/11/16 23:07:12 UTC

[1/2] incubator-apex-core git commit: - APEX-263 Fixed case where double checkpointing could occurr.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 1873f5562 -> 93bdf2d10


- APEX-263 Fixed case where double checkpointing could occurr.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/a4207c56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/a4207c56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/a4207c56

Branch: refs/heads/devel-3
Commit: a4207c5685822a22020d253c0a00231bd83af2c0
Parents: 1873f55
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sat Nov 14 17:33:34 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Mon Nov 16 12:23:01 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/engine/GenericNode.java   |   2 +
 .../stram/engine/GenericNodeTest.java           | 119 ++++++++++++++++++-
 2 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a4207c56/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 26ba98a..93cee49 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -156,10 +156,12 @@ public class GenericNode extends Node<Operator>
       checkpointWindowCount = 0;
       if (doCheckpoint) {
         checkpoint(currentWindowId);
+        lastCheckpointWindowId = currentWindowId;
         doCheckpoint = false;
       }
       else if (PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
         checkpoint(currentWindowId);
+        lastCheckpointWindowId = currentWindowId;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/a4207c56/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index d5ceae6..f2c23b2 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -19,16 +19,22 @@
 package com.datatorrent.stram.engine;
 
 import java.util.ArrayList;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Assert;
 
+import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -90,6 +96,41 @@ public class GenericNodeTest
 
   }
 
+  public static class GenericCheckpointOperator extends GenericOperator implements CheckpointListener
+  {
+    public Set<Long> checkpointedWindows = Sets.newHashSet();
+    public volatile boolean checkpointTwice = false;
+    public volatile int numWindows = 0;
+
+    public GenericCheckpointOperator()
+    {
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      numWindows++;
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+      checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId);
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+    }
+  }
+
   @Test
   @SuppressWarnings("SleepWhileInLoop")
   public void testSynchingLogic() throws InterruptedException
@@ -296,4 +337,78 @@ public class GenericNodeTest
     Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
   }
 
+  @Test
+  public void testDoubleCheckpointAtleastOnce() throws Exception
+  {
+    testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE);
+  }
+
+  @Test
+  public void testDoubleCheckpointAtMostOnce() throws Exception
+  {
+    testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE);
+  }
+
+  @Test
+  public void testDoubleCheckpointExactlyOnce() throws Exception
+  {
+    testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE);
+  }
+
+  @SuppressWarnings("SleepWhileInLoop")
+  private void testDoubleCheckpointHandling(ProcessingMode processingMode) throws Exception
+  {
+    WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
+    windowGenerator.setResetWindow(0L);
+    windowGenerator.setFirstWindow(0L);
+    windowGenerator.setWindowWidth(100);
+    windowGenerator.setCheckpointCount(1, 0);
+
+    GenericCheckpointOperator gco = new GenericCheckpointOperator();
+    DefaultAttributeMap dam = new DefaultAttributeMap();
+    dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+    dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 2);
+    dam.put(OperatorContext.PROCESSING_MODE, processingMode);
+
+    final GenericNode in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+    in.setId(1);
+
+    TestSink testSink = new TestSink();
+
+    in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
+    in.connectOutputPort("output", testSink);
+
+    windowGenerator.activate(null);
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        in.activate();
+        in.run();
+        in.deactivate();
+      }
+
+    };
+
+    t.start();
+
+    long startTime = System.currentTimeMillis();
+    long endTime = 0;
+
+    while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime) < 5000) {
+      Thread.sleep(50);
+    }
+
+    in.shutdown();
+    t.join();
+
+    windowGenerator.deactivate();
+
+    Assert.assertFalse(gco.checkpointTwice);
+    Assert.assertTrue("Timed out", (endTime - startTime) < 5000);
+  }
 }


[2/2] incubator-apex-core git commit: Merge branch 'APEX-263' of github.com:ilooner/incubator-apex-core into devel-3

Posted by pr...@apache.org.
Merge branch 'APEX-263' of github.com:ilooner/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/93bdf2d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/93bdf2d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/93bdf2d1

Branch: refs/heads/devel-3
Commit: 93bdf2d10f1949d58b4ca1dc9391c931aa2612a2
Parents: 1873f55 a4207c5
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Nov 16 13:43:31 2015 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 16 13:43:31 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/engine/GenericNode.java   |   2 +
 .../stram/engine/GenericNodeTest.java           | 119 ++++++++++++++++++-
 2 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------