You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:47 UTC

[28/50] incubator-apex-core git commit: APEX-78 #comment Checkpoint notification to notify operators before checkpoint is performed

APEX-78 #comment Checkpoint notification to notify operators before checkpoint is performed


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d5e82468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d5e82468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d5e82468

Branch: refs/heads/master
Commit: d5e82468cfd27cdfd09135f8e6956147ac5d9dc3
Parents: 7629be8
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Dec 11 06:03:29 2015 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Thu Jan 7 22:18:24 2016 -0800

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Operator.java | 26 ++++++
 .../java/com/datatorrent/stram/engine/Node.java |  4 +
 .../com/datatorrent/stram/CheckpointTest.java   | 85 +++++++++++++++++---
 3 files changed, 104 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index eb69266..785c60b 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -224,6 +224,7 @@ public interface Operator extends Component<OperatorContext>
    * Operators must implement this interface if they are interested in being notified as
    * soon as the operator state is checkpointed or committed.
    *
+   * @deprecated Use {@link CheckpointNotificationListener} instead
    * @since 0.3.2
    */
   public static interface CheckpointListener
@@ -270,4 +271,29 @@ public interface Operator extends Component<OperatorContext>
 
   }
 
+  /**
+   * Operators that need to be notified about checkpoint events should implement this interface.
+   *
+   * The notification callbacks in this interface are called outside window boundaries so the operators should not
+   * attempt to send any tuples in these callbacks.
+   *
+   */
+  interface CheckpointNotificationListener extends CheckpointListener
+  {
+    /**
+     * Notify the operator before a checkpoint is performed.
+     *
+     * Operators may need to perform certain tasks before a checkpoint such as calling flush on a stream to write out
+     * pending data. Having this notification helps operators perform such operations optimally by doing them once
+     * before checkpoint as opposed to doing them at the end of every window.
+     *
+     * The method will be called before the checkpoint is performed. It will be called after
+     * {@link Operator#endWindow()} call of the window preceding the checkpoint and before the checkpoint is
+     * actually performed.
+     *
+     * @param windowId The window id of the window preceding the checkpoint
+     */
+    void beforeCheckpoint(long windowId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 5c6b86f..068a325 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -486,6 +486,10 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
   void checkpoint(long windowId)
   {
     if (!context.stateless) {
+      if (operator instanceof Operator.CheckpointNotificationListener) {
+        ((Operator.CheckpointNotificationListener)operator).beforeCheckpoint(windowId);
+      }
+
       StorageAgent ba = context.getValue(OperatorContext.STORAGE_AGENT);
       if (ba != null) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 5d11b86..ee3cbc3 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -18,26 +18,37 @@
  */
 package com.datatorrent.stram;
 
-import com.datatorrent.common.util.BaseOperator;
-
-import java.util.*;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.junit.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
-import com.datatorrent.api.*;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.MockContainer.MockOperatorStats;
 import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
 import com.datatorrent.stram.api.Checkpoint;
@@ -63,12 +74,14 @@ public class CheckpointTest
   @Rule
   public TestMeta testMeta = new TestMeta();
 
-  private static class MockInputOperator extends BaseOperator implements InputOperator
+  private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
   {
     @OutputPortFieldAnnotation( optional = true)
     public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
     private transient int windowCount;
 
+    private int checkpointState;
+
     @Override
     public void beginWindow(long windowId)
     {
@@ -81,6 +94,22 @@ public class CheckpointTest
     public void emitTuples()
     {
     }
+
+    @Override
+    public void beforeCheckpoint(long windowId)
+    {
+      ++checkpointState;
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+    }
   }
 
   private LogicalPlan dag;
@@ -454,5 +483,39 @@ public class CheckpointTest
 
   }
 
+  @Test
+  public void testBeforeCheckpointNotification() throws IOException, ClassNotFoundException
+  {
+    FSStorageAgent storageAgent = new FSStorageAgent(testMeta.getPath(), null);
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
+    dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
+    dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 50);
+
+    MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator());
+
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    dag.setAttribute(o2, OperatorContext.STATELESS, true);
+
+    dag.addStream("o1.outport", o1.outport, o2.inport1);
+
+    StramLocalCluster sc = new StramLocalCluster(dag);
+    sc.setHeartbeatMonitoringEnabled(false);
+    sc.run();
+
+    StreamingContainerManager dnm = sc.dnmgr;
+    PhysicalPlan plan = dnm.getPhysicalPlan();
+    List<PTOperator> o1ps = plan.getOperators(dag.getMeta(o1));
+    Assert.assertEquals("Number partitions", 1, o1ps.size());
+
+    PTOperator o1p1 = o1ps.get(0);
+    long[] ckWIds = storageAgent.getWindowIds(o1p1.getId());
+    Arrays.sort(ckWIds);
+    int expectedState = 0;
+    for (long windowId : ckWIds) {
+      Object ckState = storageAgent.load(o1p1.getId(), windowId);
+      Assert.assertEquals("Checkpointed state class", MockInputOperator.class, ckState.getClass());
+      Assert.assertEquals("Checkpoint state", expectedState++, ((MockInputOperator)ckState).checkpointState);
+    }
+  }
 
 }