You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/24 01:19:23 UTC
[1/2] incubator-apex-core git commit: APEX-274 #resolve Fixing null
pointer exception in removing unifier PTOperators from Physical Plan. Updated
unit test to check this scenario
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 9656248cf -> c30dd952c
APEX-274 #resolve Fixing null pointer exception in removing unifier PTOperators from Physical Plan. Updated unit test to check this scenario
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d29ce95b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d29ce95b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d29ce95b
Branch: refs/heads/devel-3
Commit: d29ce95bdffa395f69901af697b8757b5b8a8915
Parents: ca542e3
Author: ishark <is...@datatorrent.com>
Authored: Sun Nov 22 22:16:22 2015 -0800
Committer: ishark <is...@datatorrent.com>
Committed: Sun Nov 22 22:16:22 2015 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 2 +-
.../stram/plan/physical/PhysicalPlan.java | 13 +++++---
.../stram/StreamingContainerManagerTest.java | 34 +++++++++++++++++++-
3 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d29ce95b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ca724db..29c6a2c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2396,7 +2396,7 @@ public class StreamingContainerManager implements PlanContext
}
}
}
- if (physicalOperators.size() > 0) {
+ if (physicalOperators.size() > 0 && checkpointTimeAggregate.getAvg() != null) {
loi.checkpointTimeMA = checkpointTimeAggregate.getAvg().longValue();
loi.counters = latestLogicalCounters.get(operator.getName());
loi.autoMetrics = latestLogicalMetrics.get(operator.getName());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d29ce95b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index f560a35..7858ea0 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1117,10 +1117,15 @@ public class PhysicalPlan implements Serializable
}
}
PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta);
- List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions);
- copyPartitions.remove(p);
- removePartition(p, currentMapping);
- currentMapping.partitions = copyPartitions;
+ if (currentMapping != null) {
+ List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions);
+ copyPartitions.remove(p);
+ removePartition(p, currentMapping);
+ currentMapping.partitions = copyPartitions;
+ } else {
+ // remove the operator
+ removePTOperator(p);
+ }
// remove orphaned downstream operators
for (PTOperator dop : downstreamOpers) {
if (dop.inputs.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d29ce95b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 7d5c147..f1d6ec4 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -696,8 +696,12 @@ public class StreamingContainerManagerTest
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+ GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+
dag.addStream("stream1", o1.outport1, o2.inport1);
+ dag.addStream("stream2", o2.outport1, o3.inport1);
+ dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan physicalPlan = scm.getPhysicalPlan();
@@ -706,6 +710,7 @@ public class StreamingContainerManagerTest
MockContainer mc = new MockContainer(scm, c);
mockContainers.put(c, mc);
}
+
// deploy all containers
for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
ce.getValue().deploy();
@@ -727,6 +732,27 @@ public class StreamingContainerManagerTest
o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc2.sendHeartbeat();
+ Assert.assertEquals("2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
+
+ PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1);
+ MockContainer mc3 = mockContainers.get(o2p2.getContainer());
+ MockOperatorStats o2p2mos = mc3.stats(o2p2.getId());
+ o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+ mc3.sendHeartbeat();
+
+ PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
+ MockContainer mc4 = mockContainers.get(o3p1.getContainer());
+ MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
+ o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+ mc4.sendHeartbeat();
+
+ // unifier
+ PTOperator unifier = physicalPlan.getMergeOperators(dag.getMeta(o2)).get(0);
+ MockContainer mc5 = mockContainers.get(unifier.getContainer());
+ MockOperatorStats unifierp1mos = mc5.stats(unifier.getId());
+ unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+ mc5.sendHeartbeat();
+
o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
mc1.sendHeartbeat();
scm.monitorHeartbeat();
@@ -734,7 +760,7 @@ public class StreamingContainerManagerTest
scm.monitorHeartbeat(); // committedWindowId updated in next cycle
Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
scm.processEvents();
- Assert.assertEquals("containers at committedWindowId=1", 2, physicalPlan.getContainers().size());
+ Assert.assertEquals("containers at committedWindowId=1", 5, physicalPlan.getContainers().size());
// checkpoint window 2
o1p1mos.checkpointWindowId(2);
@@ -744,7 +770,13 @@ public class StreamingContainerManagerTest
Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
o2p1mos.currentWindowId(2).checkpointWindowId(2);
+ o2p2mos.currentWindowId(2).checkpointWindowId(2);
+ o3p1mos.currentWindowId(2).checkpointWindowId(2);
+ unifierp1mos.currentWindowId(2).checkpointWindowId(2);
mc2.sendHeartbeat();
+ mc3.sendHeartbeat();
+ mc4.sendHeartbeat();
+ mc5.sendHeartbeat();
scm.monitorHeartbeat();
// Operators are shutdown when both operators reach window Id 2
[2/2] incubator-apex-core git commit: Merge branch 'APEX-274' of
https://github.com/ishark/incubator-apex-core into devel-3
Posted by th...@apache.org.
Merge branch 'APEX-274' of https://github.com/ishark/incubator-apex-core into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c30dd952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c30dd952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c30dd952
Branch: refs/heads/devel-3
Commit: c30dd952c5d99102c6626bcc1aec96febdc4aeb5
Parents: 9656248 d29ce95
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 23 16:00:34 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 23 16:00:34 2015 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 2 +-
.../stram/plan/physical/PhysicalPlan.java | 13 +++++---
.../stram/StreamingContainerManagerTest.java | 34 +++++++++++++++++++-
3 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------