You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/23 04:53:04 UTC
[3/3] incubator-apex-core git commit: APEXCORE-306 Update checkpoints
for strongly connected operators as group.
APEXCORE-306 Update checkpoints for strongly connected operators as group.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b3402be5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b3402be5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b3402be5
Branch: refs/heads/devel-3
Commit: b3402be5a45728515f4a8328fec5a76ddede0350
Parents: 4d5828c
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Jan 21 16:39:55 2016 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:24:31 2016 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 161 +++++++++++++------
.../com/datatorrent/stram/api/Checkpoint.java | 11 ++
.../stram/plan/logical/LogicalPlan.java | 91 +++++++----
.../com/datatorrent/stram/CheckpointTest.java | 3 +-
.../stram/plan/logical/DelayOperatorTest.java | 88 +++++++++-
.../stram/plan/logical/LogicalPlanTest.java | 131 ++++++++++-----
6 files changed, 358 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6233697..a687a37 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -35,6 +35,7 @@ import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -159,6 +160,7 @@ public class StreamingContainerManager implements PlanContext
private long lastResourceRequest = 0;
private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+ private Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
private CriticalPathInfo criticalPathInfo;
private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
@@ -812,6 +814,7 @@ public class StreamingContainerManager implements PlanContext
Collection<OperatorMeta> logicalOperators = getLogicalPlan().getAllOperators();
//for backward compatibility
for (OperatorMeta operatorMeta : logicalOperators) {
+ @SuppressWarnings("deprecation")
Context.CountersAggregator aggregator = operatorMeta.getValue(OperatorContext.COUNTERS_AGGREGATOR);
if (aggregator == null) {
continue;
@@ -825,6 +828,7 @@ public class StreamingContainerManager implements PlanContext
}
}
if (counters.size() > 0) {
+ @SuppressWarnings("deprecation")
Object aggregate = aggregator.aggregate(counters);
latestLogicalCounters.put(operatorMeta.getName(), aggregate);
}
@@ -857,6 +861,8 @@ public class StreamingContainerManager implements PlanContext
if (windowMetrics == null) {
windowMetrics = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(METRIC_QUEUE_SIZE)
{
+ private static final long serialVersionUID = 1L;
+
@Override
public boolean add(Pair<Long, Map<String, Object>> longMapPair)
{
@@ -1134,7 +1140,7 @@ public class StreamingContainerManager implements PlanContext
cs.container.setAllocatedVCores(0);
// resolve dependencies
- UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
+ UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups());
for (PTOperator oper : cs.container.getOperators()) {
updateRecoveryCheckpoints(oper, ctx);
}
@@ -1881,31 +1887,18 @@ public class StreamingContainerManager implements PlanContext
public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>();
public final long currentTms;
public final boolean recovery;
+ public final Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
public UpdateCheckpointsContext(Clock clock)
{
- this.currentTms = clock.getTime();
- this.recovery = false;
+ this(clock, false, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap());
}
- public UpdateCheckpointsContext(Clock clock, boolean recovery)
+ public UpdateCheckpointsContext(Clock clock, boolean recovery, Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups)
{
this.currentTms = clock.getTime();
this.recovery = recovery;
- }
-
- }
-
- private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
- {
- ctx.visited.add(operator);
- for (PTOperator.PTOutput out : operator.getOutputs()) {
- for (PTOperator.PTInput sink : out.sinks) {
- PTOperator sinkOperator = sink.target;
- if (!ctx.visited.contains(sinkOperator)) {
- addVisited(sinkOperator, ctx);
- }
- }
+ this.checkpointGroups = checkpointGroups;
}
}
@@ -1933,20 +1926,55 @@ public class StreamingContainerManager implements PlanContext
}
}
- long maxCheckpoint = operator.getRecentCheckpoint().windowId;
- if (ctx.recovery && maxCheckpoint == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
- long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
- maxCheckpoint = currentWindowId;
+ // the most recent checkpoint eligible for recovery based on downstream state
+ Checkpoint maxCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
+
+ Set<OperatorMeta> checkpointGroup = ctx.checkpointGroups.get(operator.getOperatorMeta());
+ if (checkpointGroup == null) {
+ checkpointGroup = Collections.singleton(operator.getOperatorMeta());
+ }
+ // find intersection of checkpoints that group can collectively move to
+ TreeSet<Checkpoint> commonCheckpoints = new TreeSet<>(new Checkpoint.CheckpointComparator());
+ synchronized (operator.checkpoints) {
+ commonCheckpoints.addAll(operator.checkpoints);
+ }
+ Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+ if (checkpointGroup.size() > 1) {
+ for (OperatorMeta om : checkpointGroup) {
+ Collection<PTOperator> operators = plan.getAllOperators(om);
+ for (PTOperator groupOper : operators) {
+ synchronized (groupOper.checkpoints) {
+ commonCheckpoints.retainAll(groupOper.checkpoints);
+ }
+ // visit all downstream operators of the group
+ ctx.visited.add(groupOper);
+ groupOpers.add(groupOper);
+ }
+ }
+ // highest common checkpoint
+ if (!commonCheckpoints.isEmpty()) {
+ maxCheckpoint = commonCheckpoints.last();
+ }
+ } else {
+ // without logical grouping, treat partitions as independent
+ // this is especially important for parallel partitioning
+ ctx.visited.add(operator);
+ groupOpers.add(operator);
+ maxCheckpoint = operator.getRecentCheckpoint();
+ if (ctx.recovery && maxCheckpoint.windowId == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
+ long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
+ maxCheckpoint = new Checkpoint(currentWindowId, 0, 0);
+ }
}
- ctx.visited.add(operator);
// DFS downstream operators
- if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
- addVisited(operator, ctx);
- } else {
- for (PTOperator.PTOutput out : operator.getOutputs()) {
+ for (PTOperator groupOper : groupOpers) {
+ for (PTOperator.PTOutput out : groupOper.getOutputs()) {
for (PTOperator.PTInput sink : out.sinks) {
PTOperator sinkOperator = sink.target;
+ if (groupOpers.contains(sinkOperator)) {
+ continue; // downstream operator within group
+ }
if (!ctx.visited.contains(sinkOperator)) {
// downstream traversal
updateRecoveryCheckpoints(sinkOperator, ctx);
@@ -1954,7 +1982,7 @@ public class StreamingContainerManager implements PlanContext
// recovery window id cannot move backwards
// when dynamically adding new operators
if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
- maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+ maxCheckpoint = Checkpoint.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint());
}
if (ctx.blocked.contains(sinkOperator)) {
@@ -1967,33 +1995,43 @@ public class StreamingContainerManager implements PlanContext
}
}
- // checkpoint frozen during deployment
- if (ctx.recovery || operator.getState() != PTOperator.State.PENDING_DEPLOY) {
- // remove previous checkpoints
- Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
- synchronized (operator.checkpoints) {
- if (!operator.checkpoints.isEmpty() && (operator.checkpoints.getFirst()).windowId <= maxCheckpoint) {
- c1 = operator.checkpoints.getFirst();
- Checkpoint c2;
- while (operator.checkpoints.size() > 1 && ((c2 = operator.checkpoints.get(1)).windowId) <= maxCheckpoint) {
- operator.checkpoints.removeFirst();
- //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
- this.purgeCheckpoints.add(new Pair<PTOperator, Long>(operator, c1.windowId));
- c1 = c2;
+ // find the common checkpoint that is <= downstream recovery checkpoint
+ if (!commonCheckpoints.contains(maxCheckpoint)) {
+ if (!commonCheckpoints.isEmpty()) {
+ maxCheckpoint = Objects.firstNonNull(commonCheckpoints.floor(maxCheckpoint), maxCheckpoint);
+ }
+ }
+
+ for (PTOperator groupOper : groupOpers) {
+ // checkpoint frozen during deployment
+ if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) {
+ // remove previous checkpoints
+ Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
+ LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;
+ synchronized (checkpoints) {
+ if (!checkpoints.isEmpty() && (checkpoints.getFirst()).windowId <= maxCheckpoint.windowId) {
+ c1 = checkpoints.getFirst();
+ Checkpoint c2;
+ while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) {
+ checkpoints.removeFirst();
+ //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
+ this.purgeCheckpoints.add(new Pair<PTOperator, Long>(groupOper, c1.windowId));
+ c1 = c2;
+ }
}
- }
- else {
- if (ctx.recovery && operator.checkpoints.isEmpty() && operator.isOperatorStateLess()) {
- LOG.debug("Adding checkpoint for stateless operator {} {}", operator, Codec.getStringWindowId(maxCheckpoint));
- c1 = operator.addCheckpoint(maxCheckpoint, this.vars.windowStartMillis);
+ else {
+ if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) {
+ LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId));
+ c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis);
+ }
}
}
+ //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
+ groupOper.setRecoveryCheckpoint(c1);
+ }
+ else {
+ LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState());
}
- //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
- operator.setRecoveryCheckpoint(c1);
- }
- else {
- LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
}
}
@@ -2009,13 +2047,32 @@ public class StreamingContainerManager implements PlanContext
return this.vars.windowStartMillis;
}
+ private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+ {
+ if (this.checkpointGroups == null) {
+ this.checkpointGroups = new HashMap<>();
+ LogicalPlan dag = this.plan.getLogicalPlan();
+ dag.resetNIndex();
+ LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+ for (OperatorMeta om : dag.getRootOperators()) {
+ this.plan.getLogicalPlan().findStronglyConnected(om, vc);
+ }
+ for (Set<OperatorMeta> checkpointGroup : vc.stronglyConnected) {
+ for (OperatorMeta om : checkpointGroup) {
+ this.checkpointGroups.put(om, checkpointGroup);
+ }
+ }
+ }
+ return checkpointGroups;
+ }
+
/**
* Visit all operators to update current checkpoint based on updated downstream state.
* Purge older checkpoints that are no longer needed.
*/
private long updateCheckpoints(boolean recovery)
{
- UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery);
+ UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups());
for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) {
//LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName());
List<PTOperator> operators = plan.getOperators(logicalOperator);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
index 5ec4a7e..d24b17a 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.stram.api;
+import java.util.Comparator;
+
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.bufferserver.util.Codec;
@@ -102,6 +104,15 @@ public class Checkpoint implements com.datatorrent.api.Stats.Checkpoint
return windowId;
}
+ public static class CheckpointComparator implements Comparator<Checkpoint>
+ {
+ @Override
+ public int compare(Checkpoint o1, Checkpoint o2)
+ {
+ return Long.compare(o1.windowId, o2.windowId);
+ }
+ }
+
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
public static final Checkpoint INITIAL_CHECKPOINT = new Checkpoint(Stateless.WINDOW_ID, 0, 0);
private static final long serialVersionUID = 201402152116L;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 883ad71..6d7ebe1 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -159,8 +159,6 @@ public class LogicalPlan implements Serializable, DAG
public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
- private transient int nodeIndex = 0; // used for cycle validation
- private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation
private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
@Override
@@ -1540,6 +1538,7 @@ public class LogicalPlan implements Serializable, DAG
return this.operators.get(operatorName);
}
+ @Override
public ModuleMeta getModuleMeta(String moduleName)
{
return this.modules.get(moduleName);
@@ -1557,6 +1556,7 @@ public class LogicalPlan implements Serializable, DAG
throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
}
+ @Override
public ModuleMeta getMeta(Module module)
{
for (ModuleMeta m : getAllModules()) {
@@ -1626,6 +1626,24 @@ public class LogicalPlan implements Serializable, DAG
return classNames;
}
+ public static class ValidationContext
+ {
+ public int nodeIndex = 0;
+ public Stack<OperatorMeta> stack = new Stack<OperatorMeta>();
+ public Stack<OperatorMeta> path = new Stack<OperatorMeta>();
+ public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<>();
+ public OperatorMeta invalidLoopAt;
+ public List<Set<OperatorMeta>> invalidCycles = new ArrayList<>();
+ }
+
+ public void resetNIndex()
+ {
+ for (OperatorMeta om : getAllOperators()) {
+ om.lowlink = null;
+ om.nindex = null;
+ }
+ }
+
/**
* Validate the plan. Includes checks that required ports are connected,
* required configuration parameters specified, graph free of cycles etc.
@@ -1752,21 +1770,20 @@ public class LogicalPlan implements Serializable, DAG
throw new ValidationException("At least one output port must be connected: " + n.name);
}
}
- stack = new Stack<OperatorMeta>();
- List<List<String>> cycles = new ArrayList<List<String>>();
+ ValidationContext validatonContext = new ValidationContext();
for (OperatorMeta n: operators.values()) {
if (n.nindex == null) {
- findStronglyConnected(n, cycles);
+ findStronglyConnected(n, validatonContext);
}
}
- if (!cycles.isEmpty()) {
- throw new ValidationException("Loops in graph: " + cycles);
+ if (!validatonContext.invalidCycles.isEmpty()) {
+ throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles);
}
List<List<String>> invalidDelays = new ArrayList<>();
for (OperatorMeta n : rootOperators) {
- findInvalidDelays(n, invalidDelays);
+ findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>());
}
if (!invalidDelays.isEmpty()) {
throw new ValidationException("Invalid delays in graph: " + invalidDelays);
@@ -1908,59 +1925,72 @@ public class LogicalPlan implements Serializable, DAG
* @param om
* @param cycles
*/
- public void findStronglyConnected(OperatorMeta om, List<List<String>> cycles)
+ public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
{
- om.nindex = nodeIndex;
- om.lowlink = nodeIndex;
- nodeIndex++;
- stack.push(om);
+ om.nindex = ctx.nodeIndex;
+ om.lowlink = ctx.nodeIndex;
+ ctx.nodeIndex++;
+ ctx.stack.push(om);
+ ctx.path.push(om);
// depth first successors traversal
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink: downStream.sinks) {
- if (om.getOperator() instanceof Operator.DelayOperator) {
- // this is an iteration loop, do not treat it as downstream when detecting cycles
- sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
- continue;
- }
OperatorMeta successor = sink.getOperatorWrapper();
if (successor == null) {
continue;
}
// check for self referencing node
if (om == successor) {
- cycles.add(Collections.singletonList(om.name));
+ ctx.invalidCycles.add(Collections.singleton(om));
}
if (successor.nindex == null) {
// not visited yet
- findStronglyConnected(successor, cycles);
+ findStronglyConnected(successor, ctx);
om.lowlink = Math.min(om.lowlink, successor.lowlink);
}
- else if (stack.contains(successor)) {
+ else if (ctx.stack.contains(successor)) {
om.lowlink = Math.min(om.lowlink, successor.nindex);
+ boolean isDelayLoop = false;
+ for (int i=ctx.path.size(); i>0; i--) {
+ OperatorMeta om2 = ctx.path.get(i-1);
+ if (om2.getOperator() instanceof Operator.DelayOperator) {
+ isDelayLoop = true;
+ }
+ if (om2 == successor) {
+ break;
+ }
+ }
+ if (!isDelayLoop) {
+ ctx.invalidLoopAt = successor;
+ }
}
}
}
// pop stack for all root operators
if (om.lowlink.equals(om.nindex)) {
- List<String> connectedIds = new ArrayList<String>();
- while (!stack.isEmpty()) {
- OperatorMeta n2 = stack.pop();
- connectedIds.add(n2.name);
+ Set<OperatorMeta> connectedSet = new LinkedHashSet<>(ctx.stack.size());
+ while (!ctx.stack.isEmpty()) {
+ OperatorMeta n2 = ctx.stack.pop();
+ connectedSet.add(n2);
if (n2 == om) {
break; // collected all connected operators
}
}
// strongly connected (cycle) if more than one node in stack
- if (connectedIds.size() > 1) {
- LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
- cycles.add(connectedIds);
+ if (connectedSet.size() > 1) {
+ ctx.stronglyConnected.add(connectedSet);
+ if (connectedSet.contains(ctx.invalidLoopAt)) {
+ ctx.invalidCycles.add(connectedSet);
+ }
}
}
+ ctx.path.pop();
+
}
- public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+ public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack)
{
stack.push(om);
@@ -1977,6 +2007,7 @@ public class LogicalPlan implements Serializable, DAG
for (InputPortMeta sink : downStream.sinks) {
OperatorMeta successor = sink.getOperatorWrapper();
if (isDelayOperator) {
+ sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
// Check whether all downstream operators are already visited in the path
if (successor != null && !stack.contains(successor)) {
LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
@@ -1984,7 +2015,7 @@ public class LogicalPlan implements Serializable, DAG
invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
}
} else {
- findInvalidDelays(successor, invalidDelays);
+ findInvalidDelays(successor, invalidDelays, stack);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index ee3cbc3..5675b53 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -56,6 +56,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
@@ -314,7 +315,7 @@ public class CheckpointTest
o4p1.checkpoints.add(leafCheckpoint);
UpdateCheckpointsContext ctx;
- dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true));
+ dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()));
Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint());
Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 359da17..06f184f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -20,7 +20,11 @@ package com.datatorrent.stram.plan.logical;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
@@ -32,8 +36,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
@@ -42,8 +52,17 @@ import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
+import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
+import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -75,7 +94,7 @@ public class DelayOperatorTest
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
- DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+ DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
@@ -83,7 +102,7 @@ public class DelayOperatorTest
dag.addStream("DelayToD", opDelay.output, opD.inport2);
List<List<String>> invalidDelays = new ArrayList<>();
- dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+ dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
assertEquals("operator invalid delay", 1, invalidDelays.size());
try {
@@ -106,7 +125,7 @@ public class DelayOperatorTest
dag.addStream("DelayToC", opDelay.output, opC.inport2);
invalidDelays = new ArrayList<>();
- dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+ dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
assertEquals("operator invalid delay", 1, invalidDelays.size());
try {
@@ -373,5 +392,68 @@ public class DelayOperatorTest
Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
}
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testCheckpointUpdate()
+ {
+ LogicalPlan dag = StramTestSupport.createDAG(testMeta);
+
+ TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+ GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+ GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+ GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+ DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+
+ dag.addStream("AtoB", opA.outport, opB.inport1);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DelayToB", opDelay.output, opB.inport2);
+ dag.validate();
+
+ dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+ StreamingContainerManager scm = new StreamingContainerManager(dag);
+ PhysicalPlan plan = scm.getPhysicalPlan();
+ // set all operators as active to enable recovery window id update
+ for (PTOperator oper : plan.getAllOperators().values()) {
+ oper.setState(PTOperator.State.ACTIVE);
+ }
+
+ Clock clock = new SystemClock();
+
+ PTOperator opA1 = plan.getOperators(dag.getMeta(opA)).get(0);
+ PTOperator opB1 = plan.getOperators(dag.getMeta(opB)).get(0);
+ PTOperator opC1 = plan.getOperators(dag.getMeta(opC)).get(0);
+ PTOperator opDelay1 = plan.getOperators(dag.getMeta(opDelay)).get(0);
+ PTOperator opD1 = plan.getOperators(dag.getMeta(opD)).get(0);
+
+ Checkpoint cp3 = new Checkpoint(3L, 0, 0);
+ Checkpoint cp5 = new Checkpoint(5L, 0, 0);
+ Checkpoint cp4 = new Checkpoint(4L, 0, 0);
+
+ opB1.checkpoints.add(cp3);
+ opC1.checkpoints.add(cp3);
+ opC1.checkpoints.add(cp4);
+ opDelay1.checkpoints.add(cp3);
+ opDelay1.checkpoints.add(cp5);
+ opD1.checkpoints.add(cp5);
+ // construct grouping that would be supplied through LogicalPlan
+ Set<OperatorMeta> stronglyConnected = Sets.newHashSet(dag.getMeta(opB), dag.getMeta(opC), dag.getMeta(opDelay));
+ Map<OperatorMeta, Set<OperatorMeta>> groups = new HashMap<>();
+ for (OperatorMeta om : stronglyConnected) {
+ groups.put(om, stronglyConnected);
+ }
+
+ UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
+ scm.updateRecoveryCheckpoints(opB1, ctx);
+
+ Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
+ Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());
+ Assert.assertEquals("checkpoint " + opC1, cp3, opC1.getRecoveryCheckpoint());
+ Assert.assertEquals("checkpoint " + opD1, cp5, opD1.getRecoveryCheckpoint());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index a4ac488..9383f12 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -19,6 +19,7 @@
package com.datatorrent.stram.plan.logical;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -35,6 +36,7 @@ import javax.validation.constraints.Pattern;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
@@ -43,7 +45,6 @@ import static org.junit.Assert.*;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.api.*;
-import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
@@ -61,10 +62,12 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
-public class LogicalPlanTest {
+public class LogicalPlanTest
+{
@Test
- public void testCycleDetection() {
+ public void testCycleDetection()
+ {
LogicalPlan dag = new LogicalPlan();
//NodeConf operator1 = b.getOrAddNode("operator1");
@@ -91,20 +94,20 @@ public class LogicalPlanTest {
// expected, stream can have single input/output only
}
- List<List<String>> cycles = new ArrayList<List<String>>();
- dag.findStronglyConnected(dag.getMeta(operator7), cycles);
- assertEquals("operator self reference", 1, cycles.size());
- assertEquals("operator self reference", 1, cycles.get(0).size());
- assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
+ LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+ dag.findStronglyConnected(dag.getMeta(operator7), vc);
+ assertEquals("operator self reference", 1, vc.invalidCycles.size());
+ assertEquals("operator self reference", 1, vc.invalidCycles.get(0).size());
+ assertEquals("operator self reference", dag.getMeta(operator7), vc.invalidCycles.get(0).iterator().next());
// 3 operator cycle
- cycles.clear();
- dag.findStronglyConnected(dag.getMeta(operator4), cycles);
- assertEquals("3 operator cycle", 1, cycles.size());
- assertEquals("3 operator cycle", 3, cycles.get(0).size());
- assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
- assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
- assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
+ vc = new LogicalPlan.ValidationContext();
+ dag.findStronglyConnected(dag.getMeta(operator4), vc);
+ assertEquals("3 operator cycle", 1, vc.invalidCycles.size());
+ assertEquals("3 operator cycle", 3, vc.invalidCycles.get(0).size());
+ assertTrue("operator2", vc.invalidCycles.get(0).contains(dag.getMeta(operator2)));
+ assertTrue("operator3", vc.invalidCycles.get(0).contains(dag.getMeta(operator3)));
+ assertTrue("operator4", vc.invalidCycles.get(0).contains(dag.getMeta(operator4)));
try {
dag.validate();
@@ -115,13 +118,44 @@ public class LogicalPlanTest {
}
- public static class ValidationOperator extends BaseOperator {
+ @Test
+ public void testCycleDetectionWithDelay()
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+ GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+ GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+ GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+ DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+ DefaultDelayOperator<Object> opDelay2 = dag.addOperator("opDelay2", new DefaultDelayOperator<>());
+
+ dag.addStream("AtoB", opA.outport, opB.inport1);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DtoDelay", opD.outport1, opDelay2.input);
+ dag.addStream("DelayToB", opDelay.output, opB.inport2);
+ dag.addStream("Delay2ToC", opDelay2.output, opC.inport2);
+
+ LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+ dag.findStronglyConnected(dag.getMeta(opA), vc);
+
+ Assert.assertEquals("No invalid cycle", Collections.emptyList(), vc.invalidCycles);
+ Set<OperatorMeta> exp = Sets.newHashSet(dag.getMeta(opDelay2), dag.getMeta(opDelay), dag.getMeta(opC), dag.getMeta(opB), dag.getMeta(opD));
+ Assert.assertEquals("cycle", exp, vc.stronglyConnected.get(0));
+ }
+
+
+ public static class ValidationOperator extends BaseOperator
+ {
public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
}
- public static class CounterOperator extends BaseOperator {
+ public static class CounterOperator extends BaseOperator
+ {
final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() {
@Override
final public void process(Object payload) {
@@ -130,8 +164,8 @@ public class LogicalPlanTest {
}
@Test
- public void testLogicalPlanSerialization() throws Exception {
-
+ public void testLogicalPlanSerialization() throws Exception
+ {
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
@@ -188,7 +222,8 @@ public class LogicalPlanTest {
Assert.assertEquals("", 2, dag.getAllOperators().size());
}
- public static class ValidationTestOperator extends BaseOperator implements InputOperator {
+ public static class ValidationTestOperator extends BaseOperator implements InputOperator
+ {
@NotNull
@Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!")
private String stringField1;
@@ -271,8 +306,8 @@ public class LogicalPlanTest {
}
@Test
- public void testOperatorValidation() {
-
+ public void testOperatorValidation()
+ {
ValidationTestOperator bean = new ValidationTestOperator();
bean.stringField1 = "malhar1";
bean.intField1 = 1;
@@ -348,7 +383,8 @@ public class LogicalPlanTest {
}
@OperatorAnnotation(partitionable = false)
- public static class TestOperatorAnnotationOperator extends BaseOperator {
+ public static class TestOperatorAnnotationOperator extends BaseOperator
+ {
@InputPortFieldAnnotation( optional = true)
final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() {
@@ -358,11 +394,13 @@ public class LogicalPlanTest {
};
}
- class NoInputPortOperator extends BaseOperator {
+ class NoInputPortOperator extends BaseOperator
+ {
}
@Test
- public void testValidationForNonInputRootOperator() {
+ public void testValidationForNonInputRootOperator()
+ {
LogicalPlan dag = new LogicalPlan();
NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
try {
@@ -374,8 +412,8 @@ public class LogicalPlanTest {
}
@OperatorAnnotation(partitionable = false)
- public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
-
+ public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2>
+ {
@Override
public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
{
@@ -389,7 +427,8 @@ public class LogicalPlanTest {
}
@Test
- public void testOperatorAnnotation() {
+ public void testOperatorAnnotation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
@@ -430,8 +469,8 @@ public class LogicalPlanTest {
}
@Test
- public void testPortConnectionValidation() {
-
+ public void testPortConnectionValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
@@ -459,7 +498,8 @@ public class LogicalPlanTest {
}
@Test
- public void testAtMostOnceProcessingModeValidation() {
+ public void testAtMostOnceProcessingModeValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -489,8 +529,9 @@ public class LogicalPlanTest {
}
- @Test
- public void testExactlyOnceProcessingModeValidation() {
+ @Test
+ public void testExactlyOnceProcessingModeValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -527,7 +568,8 @@ public class LogicalPlanTest {
}
@Test
- public void testLocalityValidation() {
+ public void testLocalityValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -549,7 +591,8 @@ public class LogicalPlanTest {
dag.validate();
}
- private class TestAnnotationsOperator extends BaseOperator implements InputOperator {
+ private class TestAnnotationsOperator extends BaseOperator implements InputOperator
+ {
//final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@OutputPortFieldAnnotation( optional=false)
@@ -562,7 +605,8 @@ public class LogicalPlanTest {
}
}
- private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{
+ private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
+ {
// multiple ports w/o annotation, one of them must be connected
final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@@ -573,7 +617,8 @@ public class LogicalPlanTest {
}
}
- private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{
+ private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
+ {
// multiple ports w/o annotation, one of them must be connected
@OutputPortFieldAnnotation( optional=true)
final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@@ -587,7 +632,8 @@ public class LogicalPlanTest {
}
@Test
- public void testOutputPortAnnotation() {
+ public void testOutputPortAnnotation()
+ {
LogicalPlan dag = new LogicalPlan();
TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
@@ -623,7 +669,8 @@ public class LogicalPlanTest {
* Operator that can be used with default Java serialization instead of Kryo
*/
@DefaultSerializer(JavaSerializer.class)
- public static class JdkSerializableOperator extends BaseOperator implements Serializable {
+ public static class JdkSerializableOperator extends BaseOperator implements Serializable
+ {
private static final long serialVersionUID = -4024202339520027097L;
public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable {
@@ -673,7 +720,8 @@ public class LogicalPlanTest {
}
@Test
- public void testJdkSerializableOperator() throws Exception {
+ public void testJdkSerializableOperator() throws Exception
+ {
LogicalPlan dag = new LogicalPlan();
dag.addOperator("o1", new JdkSerializableOperator());
@@ -785,7 +833,8 @@ public class LogicalPlanTest {
}
}
- public static class TestPortCodecOperator extends BaseOperator {
+ public static class TestPortCodecOperator extends BaseOperator
+ {
public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
{
@Override