You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2018/06/20 14:32:42 UTC
[apex-core] branch master updated: APEXCORE-714 Adding a new
recovery mode where the operator instance before a failure event can be
reused when recovering from an upstream operator failure
This is an automated email from the ASF dual-hosted git repository.
vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new d17f464 APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure
d17f464 is described below
commit d17f464fcaf19778e2f8edbe2b03419151558068
Author: Pramod Immaneni <pr...@datatorrent.com>
AuthorDate: Thu Sep 14 17:23:54 2017 -0700
APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused when recovering from an upstream operator failure
---
api/src/main/java/com/datatorrent/api/Context.java | 9 ++++++
api/src/main/java/com/datatorrent/api/DAG.java | 3 ++
.../main/java/com/datatorrent/api/Operator.java | 24 ++++++++++++++-
.../api/annotation/OperatorAnnotation.java | 11 +++++++
.../java/com/datatorrent/stram/engine/Node.java | 13 ++++++--
.../stram/engine/StreamingContainer.java | 36 +++++++++++++++++++---
.../stram/plan/logical/LogicalPlan.java | 6 ++++
7 files changed, 95 insertions(+), 7 deletions(-)
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 9fe0c46..0da930d 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.api.Operator.RecoveryMode;
import com.datatorrent.api.StringCodec.Class2String;
import com.datatorrent.api.StringCodec.Collection2String;
import com.datatorrent.api.StringCodec.Integer2String;
@@ -317,6 +318,14 @@ public interface Context
Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<>(Object2String.<AutoMetric.DimensionsScheme>getInstance());
/**
+ * Specify how to recover the operator in cases of a failure event. The default is to load from checkpoint. However,
+ * in some cases reusing same instance of the operator from before the failure event may be desired. See
+ * {@link RecoveryMode} The latter is only applicable in cases where the recovery is due to a failure of the
+ * upstream operator and not the operator itself.
+ */
+ Attribute<RecoveryMode> RECOVERY_MODE = new Attribute<RecoveryMode>(RecoveryMode.CHECKPOINT);
+
+ /**
* Return the operator runtime id.
*
* @return The id
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 93936d7..471950b 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -27,6 +27,7 @@ import javax.annotation.Nonnull;
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.annotation.OperatorAnnotation;
/**
* DAG contains the logical declarations of operators and streams.
@@ -190,6 +191,8 @@ public interface DAG extends DAGContext, Serializable
OutputPortMeta getMeta(Operator.OutputPort<?> port);
+ OperatorAnnotation getOperatorAnnotation();
+
/**
* Return collection of stream which are connected to this operator's
* input ports.
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index dd694d0..f0357e5 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.api;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.GenericOperator;
@@ -71,6 +73,25 @@ public interface Operator extends Component<OperatorContext>, GenericOperator
}
+ @Evolving
+ enum RecoveryMode
+ {
+ /**
+ * Recover the operator from checkpoint
+ */
+ CHECKPOINT,
+ /**
+ * Reuse the same instance of the operator from before the failure event.
+ *
+ * This applies to scenarios where the failure is in an upstream operator and the not the operator itself.
+ * Reusing the same instance may not be applicable in all cases as it can lead to incorrect results because the
+ * operator state will not be consistent with the processing position in the stream. This should be used only for
+ * operators that are either invariant to reusing the same state with the stream processing position modified
+ * according to the processing mode or tolerant to it.
+ */
+ REUSE_INSTANCE
+ }
+
/**
* This method gets called at the beginning of each window.
*
@@ -227,7 +248,8 @@ public interface Operator extends Component<OperatorContext>, GenericOperator
{
/**
* Do the operations just before the operator starts processing tasks within the windows.
- * e.g. establish a network connection.
+ * e.g. establish a network connection. This method is called irrespective of what {@link RecoveryMode}
+ * is being used.
* @param context - the context in which the operator is executing.
*/
void activate(CONTEXT context);
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
index 16fd370..e7922b3 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
@@ -24,6 +24,8 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import com.datatorrent.api.Operator;
+
/**
* Annotation to specify characteristics of an operator.
*
@@ -49,4 +51,13 @@ public @interface OperatorAnnotation
* @return whether operator can be checkpointed in middle of an application window.
*/
boolean checkpointableWithinAppWindow() default true;
+
+ /**
+ * Element specifying the recovery mode for the operator.
+ * By default the operator state is recovered from checkpoint. The operator developer can indicate a preference for
+ * the recovery mode with this element, see {@link Operator.RecoveryMode}. The application developer can override this
+ * by setting the {@link Context.OperatorContext#RECOVERY_MODE} attribute.
+ * @return
+ */
+ Operator.RecoveryMode recoveryMode() default Operator.RecoveryMode.CHECKPOINT;
}
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 88b002f..5f222c5 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -135,6 +135,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
public long firstWindowMillis;
public long windowWidthMillis;
+ protected boolean reuseOperator;
+
public Node(OPERATOR operator, OperatorContext context)
{
this.operator = operator;
@@ -180,12 +182,19 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
return operator;
}
+ public void setReuseOperator(boolean reuseOperator)
+ {
+ this.reuseOperator = reuseOperator;
+ }
+
@Override
public void setup(OperatorContext context)
{
shutdown = false;
- logger.debug("Operator Context = {}", context);
- operator.setup(context);
+ if (!reuseOperator) {
+ logger.debug("Operator Context = {}", context);
+ operator.setup(context);
+ }
// this is where the ports should be setup but since the
// portcontext is not available here, we are doing it in
// StramChild. In future version, we should move that code here
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 927ad6d..699d646 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -66,6 +66,7 @@ import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.StringCodec;
+import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
@@ -163,6 +164,8 @@ public class StreamingContainer extends YarnContainerMain
private RequestFactory requestFactory;
private TokenRenewer tokenRenewer;
+ private Map<Integer, Node<?>> reuseOpNodes = new HashMap<>();
+
static {
try {
eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
@@ -518,7 +521,7 @@ public class StreamingContainer extends YarnContainerMain
}
}
- private synchronized void undeploy(List<Integer> nodeList)
+ private synchronized Map<Integer, Node<?>> undeploy(List<Integer> nodeList)
{
/**
* make sure that all the operators which we are asked to undeploy are in this container.
@@ -565,6 +568,8 @@ public class StreamingContainer extends YarnContainerMain
for (Integer operatorId : nodeList) {
nodes.remove(operatorId);
}
+
+ return toUndeploy;
}
public void teardown()
@@ -800,7 +805,9 @@ public class StreamingContainer extends YarnContainerMain
if (rsp.undeployRequest != null) {
logger.info("Undeploy request: {}", rsp.undeployRequest);
processNodeRequests(false);
- undeploy(rsp.undeployRequest);
+ Map<Integer, Node<?>> undeployNodes = undeploy(rsp.undeployRequest);
+ undeployNodes.entrySet().removeIf((entry) -> !isReuseOperator(entry.getValue()));
+ reuseOpNodes.putAll(undeployNodes);
}
if (rsp.shutdown != null) {
@@ -833,6 +840,19 @@ public class StreamingContainer extends YarnContainerMain
processNodeRequests(true);
}
+ private boolean isReuseOperator(Node<?> node)
+ {
+ if (node.context.getAttributes().contains(OperatorContext.RECOVERY_MODE)) {
+ return node.context.getValue(OperatorContext.RECOVERY_MODE) == Operator.RecoveryMode.REUSE_INSTANCE;
+ } else {
+ if (node.operator.getClass().isAnnotationPresent(OperatorAnnotation.class)) {
+ return node.operator.getClass().getAnnotation(OperatorAnnotation.class).recoveryMode() == Operator.RecoveryMode.REUSE_INSTANCE;
+ }
+ }
+ logger.debug("Is reuse operator {} {}", node, false);
+ return false;
+ }
+
private void stopInputNodes()
{
for (Entry<Integer, Node<?>> e : nodes.entrySet()) {
@@ -924,8 +944,16 @@ public class StreamingContainer extends YarnContainerMain
OperatorContext ctx = new OperatorContext(ndi.id, ndi.name, ndi.contextAttributes, parentContext);
ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, ndi.checkpoint.windowId);
- logger.debug("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
- Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
+ Node<?> node = reuseOpNodes.get(ndi.id);
+ if (node == null) {
+ logger.info("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
+ node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
+ } else {
+ logger.info("Reusing previous operator instance {}", ndi.id);
+ node = Node.retrieveNode(node.operator, ctx, ndi.type);
+ node.setReuseOperator(true);
+ reuseOpNodes.remove(ndi.id);
+ }
node.currentWindowId = ndi.checkpoint.windowId;
node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
node.firstWindowMillis = firstWindowMillis;
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 18a9a63..74510a6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1166,6 +1166,12 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
+ public OperatorAnnotation getOperatorAnnotation()
+ {
+ return operatorAnnotation;
+ }
+
+ @Override
public InputPortMeta getMeta(Operator.InputPort<?> port)
{
return getPortMapping().inPortMap.get(port);