You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/24 01:19:23 UTC

[1/2] incubator-apex-core git commit: APEX-274 #resolve Fixing null pointer exception in removing unifier PTOperators from Physical Plan. Updated unit test to check this scenario

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 9656248cf -> c30dd952c


APEX-274 #resolve Fixing null pointer exception in removing unifier PTOperators from Physical Plan. Updated unit test to check this scenario


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d29ce95b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d29ce95b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d29ce95b

Branch: refs/heads/devel-3
Commit: d29ce95bdffa395f69901af697b8757b5b8a8915
Parents: ca542e3
Author: ishark <is...@datatorrent.com>
Authored: Sun Nov 22 22:16:22 2015 -0800
Committer: ishark <is...@datatorrent.com>
Committed: Sun Nov 22 22:16:22 2015 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        |  2 +-
 .../stram/plan/physical/PhysicalPlan.java       | 13 +++++---
 .../stram/StreamingContainerManagerTest.java    | 34 +++++++++++++++++++-
 3 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d29ce95b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ca724db..29c6a2c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2396,7 +2396,7 @@ public class StreamingContainerManager implements PlanContext
         }
       }
     }
-    if (physicalOperators.size() > 0) {
+    if (physicalOperators.size() > 0 && checkpointTimeAggregate.getAvg() != null) {
       loi.checkpointTimeMA = checkpointTimeAggregate.getAvg().longValue();
       loi.counters = latestLogicalCounters.get(operator.getName());
       loi.autoMetrics = latestLogicalMetrics.get(operator.getName());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d29ce95b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index f560a35..7858ea0 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1117,10 +1117,15 @@ public class PhysicalPlan implements Serializable
       }
     }
     PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta);
-    List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions);
-    copyPartitions.remove(p);
-    removePartition(p, currentMapping);
-    currentMapping.partitions = copyPartitions;
+    if (currentMapping != null) {
+      List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions);
+      copyPartitions.remove(p);
+      removePartition(p, currentMapping);
+      currentMapping.partitions = copyPartitions;
+    } else {
+      // remove the operator
+      removePTOperator(p);
+    }
     // remove orphaned downstream operators
     for (PTOperator dop : downstreamOpers) {
       if (dop.inputs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d29ce95b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 7d5c147..f1d6ec4 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -696,8 +696,12 @@ public class StreamingContainerManagerTest
 
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+
     dag.addStream("stream1", o1.outport1, o2.inport1);
+    dag.addStream("stream2", o2.outport1, o3.inport1);
 
+    dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
     StreamingContainerManager scm = new StreamingContainerManager(dag);
 
     PhysicalPlan physicalPlan = scm.getPhysicalPlan();
@@ -706,6 +710,7 @@ public class StreamingContainerManagerTest
       MockContainer mc = new MockContainer(scm, c);
       mockContainers.put(c, mc);
     }
+
     // deploy all containers
     for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
       ce.getValue().deploy();
@@ -727,6 +732,27 @@ public class StreamingContainerManagerTest
     o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
     mc2.sendHeartbeat();
 
+    Assert.assertEquals("2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
+
+    PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1);
+    MockContainer mc3 = mockContainers.get(o2p2.getContainer());
+    MockOperatorStats o2p2mos = mc3.stats(o2p2.getId());
+    o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc3.sendHeartbeat();
+
+    PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
+    MockContainer mc4 = mockContainers.get(o3p1.getContainer());
+    MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
+    o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc4.sendHeartbeat();
+
+    // unifier
+    PTOperator unifier = physicalPlan.getMergeOperators(dag.getMeta(o2)).get(0);
+    MockContainer mc5 = mockContainers.get(unifier.getContainer());
+    MockOperatorStats unifierp1mos = mc5.stats(unifier.getId());
+    unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc5.sendHeartbeat();
+
     o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
     mc1.sendHeartbeat();
     scm.monitorHeartbeat();
@@ -734,7 +760,7 @@ public class StreamingContainerManagerTest
     scm.monitorHeartbeat(); // committedWindowId updated in next cycle
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
     scm.processEvents();
-    Assert.assertEquals("containers at committedWindowId=1", 2, physicalPlan.getContainers().size());
+    Assert.assertEquals("containers at committedWindowId=1", 5, physicalPlan.getContainers().size());
 
     // checkpoint window 2
     o1p1mos.checkpointWindowId(2);
@@ -744,7 +770,13 @@ public class StreamingContainerManagerTest
     Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
 
     o2p1mos.currentWindowId(2).checkpointWindowId(2);
+    o2p2mos.currentWindowId(2).checkpointWindowId(2);
+    o3p1mos.currentWindowId(2).checkpointWindowId(2);
+    unifierp1mos.currentWindowId(2).checkpointWindowId(2);
     mc2.sendHeartbeat();
+    mc3.sendHeartbeat();
+    mc4.sendHeartbeat();
+    mc5.sendHeartbeat();
     scm.monitorHeartbeat();
 
     // Operators are shutdown when both operators reach window Id 2


[2/2] incubator-apex-core git commit: Merge branch 'APEX-274' of https://github.com/ishark/incubator-apex-core into devel-3

Posted by th...@apache.org.
Merge branch 'APEX-274' of https://github.com/ishark/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c30dd952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c30dd952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c30dd952

Branch: refs/heads/devel-3
Commit: c30dd952c5d99102c6626bcc1aec96febdc4aeb5
Parents: 9656248 d29ce95
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 23 16:00:34 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 23 16:00:34 2015 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        |  2 +-
 .../stram/plan/physical/PhysicalPlan.java       | 13 +++++---
 .../stram/StreamingContainerManagerTest.java    | 34 +++++++++++++++++++-
 3 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------