You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:47 UTC
[28/50] incubator-apex-core git commit: APEX-78 #comment Checkpoint
notification to notify operators before checkpoint is performed
APEX-78 #comment Checkpoint notification to notify operators before checkpoint is performed
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d5e82468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d5e82468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d5e82468
Branch: refs/heads/master
Commit: d5e82468cfd27cdfd09135f8e6956147ac5d9dc3
Parents: 7629be8
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Dec 11 06:03:29 2015 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Thu Jan 7 22:18:24 2016 -0800
----------------------------------------------------------------------
.../main/java/com/datatorrent/api/Operator.java | 26 ++++++
.../java/com/datatorrent/stram/engine/Node.java | 4 +
.../com/datatorrent/stram/CheckpointTest.java | 85 +++++++++++++++++---
3 files changed, 104 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index eb69266..785c60b 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -224,6 +224,7 @@ public interface Operator extends Component<OperatorContext>
* Operators must implement this interface if they are interested in being notified as
* soon as the operator state is checkpointed or committed.
*
+ * @deprecated Use {@link CheckpointNotificationListener} instead
* @since 0.3.2
*/
public static interface CheckpointListener
@@ -270,4 +271,29 @@ public interface Operator extends Component<OperatorContext>
}
+ /**
+ * Operators that need to be notified about checkpoint events should implement this interface.
+ *
+ * The notification callbacks in this interface are called outside window boundaries so the operators should not
+ * attempt to send any tuples in these callbacks.
+ *
+ */
+ interface CheckpointNotificationListener extends CheckpointListener
+ {
+ /**
+ * Notify the operator before a checkpoint is performed.
+ *
+ * Operators may need to perform certain tasks before a checkpoint such as calling flush on a stream to write out
+ * pending data. Having this notification helps operators perform such operations optimally by doing them once
+ * before checkpoint as opposed to doing them at the end of every window.
+ *
+ * The method will be called before the checkpoint is performed. It will be called after
+ * {@link Operator#endWindow()} call of the window preceding the checkpoint and before the checkpoint is
+ * actually performed.
+ *
+ * @param windowId The window id of the window preceding the checkpoint
+ */
+ void beforeCheckpoint(long windowId);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 5c6b86f..068a325 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -486,6 +486,10 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
void checkpoint(long windowId)
{
if (!context.stateless) {
+ if (operator instanceof Operator.CheckpointNotificationListener) {
+ ((Operator.CheckpointNotificationListener)operator).beforeCheckpoint(windowId);
+ }
+
StorageAgent ba = context.getValue(OperatorContext.STORAGE_AGENT);
if (ba != null) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d5e82468/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 5d11b86..ee3cbc3 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -18,26 +18,37 @@
*/
package com.datatorrent.stram;
-import com.datatorrent.common.util.BaseOperator;
-
-import java.util.*;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.junit.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
-import com.datatorrent.api.*;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.common.util.AsyncFSStorageAgent;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.MockContainer.MockOperatorStats;
import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
import com.datatorrent.stram.api.Checkpoint;
@@ -63,12 +74,14 @@ public class CheckpointTest
@Rule
public TestMeta testMeta = new TestMeta();
- private static class MockInputOperator extends BaseOperator implements InputOperator
+ private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener
{
@OutputPortFieldAnnotation( optional = true)
public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<Object>();
private transient int windowCount;
+ private int checkpointState;
+
@Override
public void beginWindow(long windowId)
{
@@ -81,6 +94,22 @@ public class CheckpointTest
public void emitTuples()
{
}
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ ++checkpointState;
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ }
}
private LogicalPlan dag;
@@ -454,5 +483,39 @@ public class CheckpointTest
}
+ @Test
+ public void testBeforeCheckpointNotification() throws IOException, ClassNotFoundException
+ {
+ FSStorageAgent storageAgent = new FSStorageAgent(testMeta.getPath(), null);
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent);
+ dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
+ dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 50);
+
+ MockInputOperator o1 = dag.addOperator("o1", new MockInputOperator());
+
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+ dag.setAttribute(o2, OperatorContext.STATELESS, true);
+
+ dag.addStream("o1.outport", o1.outport, o2.inport1);
+
+ StramLocalCluster sc = new StramLocalCluster(dag);
+ sc.setHeartbeatMonitoringEnabled(false);
+ sc.run();
+
+ StreamingContainerManager dnm = sc.dnmgr;
+ PhysicalPlan plan = dnm.getPhysicalPlan();
+ List<PTOperator> o1ps = plan.getOperators(dag.getMeta(o1));
+ Assert.assertEquals("Number partitions", 1, o1ps.size());
+
+ PTOperator o1p1 = o1ps.get(0);
+ long[] ckWIds = storageAgent.getWindowIds(o1p1.getId());
+ Arrays.sort(ckWIds);
+ int expectedState = 0;
+ for (long windowId : ckWIds) {
+ Object ckState = storageAgent.load(o1p1.getId(), windowId);
+ Assert.assertEquals("Checkpointed state class", MockInputOperator.class, ckState.getClass());
+ Assert.assertEquals("Checkpoint state", expectedState++, ((MockInputOperator)ckState).checkpointState);
+ }
+ }
}