You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:31 UTC
[30/50] [abbrv] incubator-apex-core git commit: SPOI-5053 APEX-56
#resolve #comment Fixing removal of terminated operators from physical plan
when downanStream operators are also completed till shutdown window Id Also
fixed containers to be removed only
SPOI-5053 APEX-56 #resolve #comment
Fixing removal of terminated operators from physical plan when downanStream operators are also completed till shutdown window Id
Also fixed containers to be removed only when operators are removed from physical plan
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d2bf3e56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d2bf3e56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d2bf3e56
Branch: refs/heads/master
Commit: d2bf3e566e38ce518ad2e28fb260d0710c175251
Parents: 8f3c9ea
Author: ishark <is...@datatorrent.com>
Authored: Mon Oct 12 16:05:32 2015 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Oct 17 23:37:35 2015 -0700
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 35 +++++++++++---------
.../stram/StreamingContainerManagerTest.java | 11 ++----
2 files changed, 22 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d2bf3e56/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index ed366db..3931fad 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1040,10 +1040,7 @@ public class StreamingContainerManager implements PlanContext
Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, Set<PTOperator>> windowAndOpers = it.next();
- if (windowAndOpers.getKey().longValue() > this.committedWindowId) {
- // wait until window is committed
- continue;
- } else {
+ if (windowAndOpers.getKey().longValue() <= this.committedWindowId || checkDownStreamOperators(windowAndOpers)) {
LOG.info("Removing inactive operators at window {} {}", Codec.getStringWindowId(windowAndOpers.getKey()), windowAndOpers.getValue());
for (PTOperator oper : windowAndOpers.getValue()) {
plan.removeTerminatedPartition(oper);
@@ -1070,8 +1067,7 @@ public class StreamingContainerManager implements PlanContext
try {
command.run();
count++;
- }
- catch (Exception e) {
+ } catch (Exception e) {
// TODO: handle error
LOG.error("Failed to execute {}", command, e);
}
@@ -1081,8 +1077,7 @@ public class StreamingContainerManager implements PlanContext
if (count > 0) {
try {
checkpoint();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException("Failed to checkpoint state.", e);
}
}
@@ -1090,6 +1085,19 @@ public class StreamingContainerManager implements PlanContext
return count;
}
+ private boolean checkDownStreamOperators(Map.Entry<Long, Set<PTOperator>> windowAndOpers)
+ {
+ // Check if all downStream operators are at higher window Ids, then operator can be removed from dag
+ Set<PTOperator> downStreamOperators = getPhysicalPlan().getDependents(windowAndOpers.getValue());
+ for (PTOperator oper : downStreamOperators) {
+ long windowId = oper.stats.currentWindowId.get();
+ if (windowId < windowAndOpers.getKey().longValue()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Schedule container restart. Called by Stram after a container was terminated
* and requires recovery (killed externally, or after heartbeat timeout). <br>
@@ -1495,8 +1503,6 @@ public class StreamingContainerManager implements PlanContext
}
Set<Integer> reportedOperators = Sets.newHashSetWithExpectedSize(sca.container.getOperators().size());
- boolean containerIdle = true;
-
for (OperatorHeartbeat shb : heartbeat.getContainerStats().operators) {
long maxEndWindowTimestamp = 0;
@@ -1533,9 +1539,7 @@ public class StreamingContainerManager implements PlanContext
oper.stats.lastHeartbeat = shb;
List<ContainerStats.OperatorStats> statsList = shb.getOperatorStatsContainer();
- if (!oper.stats.isIdle()) {
- containerIdle = false;
- }
+
if (!statsList.isEmpty()) {
long tuplesProcessed = 0;
long tuplesEmitted = 0;
@@ -1743,11 +1747,10 @@ public class StreamingContainerManager implements PlanContext
ContainerHeartbeatResponse rsp = getHeartbeatResponse(sca);
- if (containerIdle && isApplicationIdle()) {
+ if (heartbeat.getContainerStats().operators.isEmpty() && isApplicationIdle()) {
LOG.info("requesting idle shutdown for container {}", heartbeat.getContainerId());
rsp.shutdown = true;
- }
- else {
+ } else {
if (sca.shutdownRequested) {
LOG.info("requesting shutdown for container {}", heartbeat.getContainerId());
rsp.shutdown = true;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d2bf3e56/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 710440d..2884323 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -762,18 +762,13 @@ public class StreamingContainerManagerTest
mc1.sendHeartbeat();
scm.monitorHeartbeat();
+ Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
+
o2p1mos.currentWindowId(2).checkpointWindowId(2);
mc2.sendHeartbeat();
scm.monitorHeartbeat();
- Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
- scm.monitorHeartbeat(); // committedWindowId updated in next cycle
- Assert.assertEquals("committedWindowId", 2, scm.getCommittedWindowId());
- Assert.assertEquals(1, o1p1.getContainer().getOperators().size());
- Assert.assertEquals(1, o2p1.getContainer().getOperators().size());
- Assert.assertEquals(2, physicalPlan.getContainers().size());
- // call again as events are processed after committed window was updated
- scm.processEvents();
+ // Operators are shutdown when both operators reach window Id 2
Assert.assertEquals(0, o1p1.getContainer().getOperators().size());
Assert.assertEquals(0, o2p1.getContainer().getOperators().size());
Assert.assertEquals(0, physicalPlan.getContainers().size());