You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/20 12:18:17 UTC

apex-core git commit: APEXCORE-532: Fix issue where new operators added to dag starts from initial checkpoint

Repository: apex-core
Updated Branches:
  refs/heads/master 81b8c922c -> eaf041931


APEXCORE-532: Fix issue where new operators added to dag starts from initial checkpoint


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/eaf04193
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/eaf04193
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/eaf04193

Branch: refs/heads/master
Commit: eaf04193168ecc43f53958d750ff8f69f6b2c75a
Parents: 81b8c92
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Oct 3 12:37:59 2016 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Thu Oct 20 12:34:46 2016 +0530

----------------------------------------------------------------------
 .../stram/plan/physical/PhysicalPlan.java       |  7 ++--
 .../stram/LogicalPlanModificationTest.java      | 34 ++++++++++++++++++++
 .../stram/plan/physical/PhysicalPlanTest.java   |  6 ++--
 3 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 11df7ba..4181971 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -781,7 +781,7 @@ public class PhysicalPlan implements Serializable
     // create operator instance per partition
     Map<Integer, Partition<Operator>> operatorIdToPartition = Maps.newHashMapWithExpectedSize(partitions.size());
     for (Partition<Operator> partition : partitions) {
-      PTOperator p = addPTOperator(m, partition, Checkpoint.INITIAL_CHECKPOINT);
+      PTOperator p = addPTOperator(m, partition, null);
       operatorIdToPartition.put(p.getId(), partition);
     }
 
@@ -1268,10 +1268,11 @@ public class PhysicalPlan implements Serializable
       Checkpoint activationCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
       for (PTInput input : oper.inputs) {
         PTOperator sourceOper = input.source.source;
+        Checkpoint checkpoint = sourceOper.recoveryCheckpoint;
         if (sourceOper.checkpoints.isEmpty()) {
-          getActivationCheckpoint(sourceOper);
+          checkpoint = getActivationCheckpoint(sourceOper);
         }
-        activationCheckpoint = Checkpoint.max(activationCheckpoint, sourceOper.recoveryCheckpoint);
+        activationCheckpoint = Checkpoint.max(activationCheckpoint, checkpoint);
       }
       return activationCheckpoint;
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
index 1704e55..ef44767 100644
--- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java
@@ -52,6 +52,7 @@ import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
 import com.datatorrent.stram.plan.physical.PTContainer;
 import com.datatorrent.stram.plan.physical.PTOperator;
 import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.plan.physical.PhysicalPlanTest;
 import com.datatorrent.stram.plan.physical.PlanModifier;
 import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
@@ -408,4 +409,37 @@ public class LogicalPlanModificationTest
     testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null));
   }
 
+  @Test
+  public void testNewOperatorRecoveryWindowIds()
+  {
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+
+    TestPlanContext ctx = new TestPlanContext();
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
+    PhysicalPlan plan = new PhysicalPlan(dag, ctx);
+    ctx.deploy.clear();
+    ctx.undeploy.clear();
+
+    LogicalPlan.OperatorMeta o1Meta = dag.getMeta(o1);
+    List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
+    PhysicalPlanTest.setActivationCheckpoint(o1Partitions.get(0), 10);
+
+    PlanModifier pm = new PlanModifier(plan);
+    GenericTestOperator o2 = new GenericTestOperator();
+    GenericTestOperator o3 = new GenericTestOperator();
+    pm.addOperator("o2", o2);
+    pm.addOperator("o3", o3);
+    pm.addStream("s1", o1.outport1, o2.inport2);
+    pm.addStream("s2", o2.outport1, o3.inport1);
+
+    pm.applyChanges(ctx);
+
+    LogicalPlan.OperatorMeta o2Meta = plan.getLogicalPlan().getMeta(o2);
+    List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
+    Assert.assertEquals("o2 activation checkpoint " + o2Meta, 10, o2Partitions.get(0).getRecoveryCheckpoint().windowId);
+
+    LogicalPlan.OperatorMeta o3Meta = plan.getLogicalPlan().getMeta(o3);
+    List<PTOperator> o3Partitions = plan.getOperators(o3Meta);
+    Assert.assertEquals("o3 activation checkpoint " + o2Meta, 10, o3Partitions.get(0).getRecoveryCheckpoint().windowId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/eaf04193/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index e426194..61a85a5 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -710,11 +710,13 @@ public class PhysicalPlanTest
     Assert.assertEquals("unifier activation checkpoint " + o1Meta, 3, o1NewUnifiers.get(0).recoveryCheckpoint.windowId);
   }
 
-  private void setActivationCheckpoint(PTOperator oper, long windowId)
+  public static void setActivationCheckpoint(PTOperator oper, long windowId)
   {
     try {
       oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oper.operatorMeta.getOperator(), oper.id, windowId);
-      oper.setRecoveryCheckpoint(new Checkpoint(3, 0, 0));
+      Checkpoint cp = new Checkpoint(windowId, 0, 0);
+      oper.setRecoveryCheckpoint(cp);
+      oper.checkpoints.add(cp);
     } catch (Exception e) {
       Assert.fail(e.toString());
     }