You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/20 12:18:17 UTC
apex-core git commit: APEXCORE-532: Fix issue where new operators
added to dag starts from initial checkpoint
Repository: apex-core
Updated Branches:
refs/heads/master 81b8c922c -> eaf041931
APEXCORE-532: Fix issue where new operators added to dag starts from initial checkpoint
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/eaf04193
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/eaf04193
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/eaf04193
Branch: refs/heads/master
Commit: eaf04193168ecc43f53958d750ff8f69f6b2c75a
Parents: 81b8c92
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Oct 3 12:37:59 2016 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Thu Oct 20 12:34:46 2016 +0530
----------------------------------------------------------------------
.../stram/plan/physical/PhysicalPlan.java | 7 ++--
.../stram/LogicalPlanModificationTest.java | 34 ++++++++++++++++++++
.../stram/plan/physical/PhysicalPlanTest.java | 6 ++--
3 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 11df7ba..4181971 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -781,7 +781,7 @@ public class PhysicalPlan implements Serializable
// create operator instance per partition
Map<Integer, Partition<Operator>> operatorIdToPartition = Maps.newHashMapWithExpectedSize(partitions.size());
for (Partition<Operator> partition : partitions) {
- PTOperator p = addPTOperator(m, partition, Checkpoint.INITIAL_CHECKPOINT);
+ PTOperator p = addPTOperator(m, partition, null);
operatorIdToPartition.put(p.getId(), partition);
}
@@ -1268,10 +1268,11 @@ public class PhysicalPlan implements Serializable
Checkpoint activationCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
for (PTInput input : oper.inputs) {
PTOperator sourceOper = input.source.source;
+ Checkpoint checkpoint = sourceOper.recoveryCheckpoint;
if (sourceOper.checkpoints.isEmpty()) {
- getActivationCheckpoint(sourceOper);
+ checkpoint = getActivationCheckpoint(sourceOper);
}
- activationCheckpoint = Checkpoint.max(activationCheckpoint, sourceOper.recoveryCheckpoint);
+ activationCheckpoint = Checkpoint.max(activationCheckpoint, checkpoint);
}
return activationCheckpoint;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index 1704e55..ef44767 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -52,6 +52,7 @@ import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.plan.physical.PhysicalPlanTest;
import com.datatorrent.stram.plan.physical.PlanModifier;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
@@ -408,4 +409,37 @@ public class LogicalPlanModificationTest
testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null));
}
+ @Test
+ public void testNewOperatorRecoveryWindowIds()
+ {
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+
+ TestPlanContext ctx = new TestPlanContext();
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
+ PhysicalPlan plan = new PhysicalPlan(dag, ctx);
+ ctx.deploy.clear();
+ ctx.undeploy.clear();
+
+ LogicalPlan.OperatorMeta o1Meta = dag.getMeta(o1);
+ List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
+ PhysicalPlanTest.setActivationCheckpoint(o1Partitions.get(0), 10);
+
+ PlanModifier pm = new PlanModifier(plan);
+ GenericTestOperator o2 = new GenericTestOperator();
+ GenericTestOperator o3 = new GenericTestOperator();
+ pm.addOperator("o2", o2);
+ pm.addOperator("o3", o3);
+ pm.addStream("s1", o1.outport1, o2.inport2);
+ pm.addStream("s2", o2.outport1, o3.inport1);
+
+ pm.applyChanges(ctx);
+
+ LogicalPlan.OperatorMeta o2Meta = plan.getLogicalPlan().getMeta(o2);
+ List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
+ Assert.assertEquals("o2 activation checkpoint " + o2Meta, 10, o2Partitions.get(0).getRecoveryCheckpoint().windowId);
+
+ LogicalPlan.OperatorMeta o3Meta = plan.getLogicalPlan().getMeta(o3);
+ List<PTOperator> o3Partitions = plan.getOperators(o3Meta);
+ Assert.assertEquals("o3 activation checkpoint " + o2Meta, 10, o3Partitions.get(0).getRecoveryCheckpoint().windowId);
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index e426194..61a85a5 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -710,11 +710,13 @@ public class PhysicalPlanTest
Assert.assertEquals("unifier activation checkpoint " + o1Meta, 3, o1NewUnifiers.get(0).recoveryCheckpoint.windowId);
}
- private void setActivationCheckpoint(PTOperator oper, long windowId)
+ public static void setActivationCheckpoint(PTOperator oper, long windowId)
{
try {
oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oper.operatorMeta.getOperator(), oper.id, windowId);
- oper.setRecoveryCheckpoint(new Checkpoint(3, 0, 0));
+ Checkpoint cp = new Checkpoint(windowId, 0, 0);
+ oper.setRecoveryCheckpoint(cp);
+ oper.checkpoints.add(cp);
} catch (Exception e) {
Assert.fail(e.toString());
}