You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:12 UTC
[41/50] incubator-apex-core git commit: Fix for review comment.
Fix for review comment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/064edf08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/064edf08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/064edf08
Branch: refs/heads/master
Commit: 064edf08447d88ad9147bfab460705dfbaf4b4f3
Parents: d19fa66
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 12:43:33 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 12:43:33 2015 -0700
----------------------------------------------------------------------
.../com/datatorrent/stram/StreamingContainerManager.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/064edf08/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index eed2948..7002c1d 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1004,7 +1004,7 @@ public class StreamingContainerManager implements PlanContext
}
reportStats.remove(o);
}
-
+
if (!this.shutdownOperators.isEmpty()) {
synchronized (this.shutdownOperators) {
Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator();
@@ -1023,7 +1023,7 @@ public class StreamingContainerManager implements PlanContext
}
}
}
-
+
if (!eventQueue.isEmpty()) {
for (PTOperator oper : plan.getAllOperators().values()) {
if (oper.getState() != PTOperator.State.ACTIVE) {
@@ -1297,7 +1297,7 @@ public class StreamingContainerManager implements PlanContext
case SHUTDOWN:
// schedule operator deactivation against the windowId
// will be processed once window is committed and all dependent operators completed processing
- long windowId = oper.stats.currentWindowId.get();
+ long windowId = oper.stats.currentWindowId.get();
if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) {
windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId;
}
@@ -1305,8 +1305,9 @@ public class StreamingContainerManager implements PlanContext
synchronized (this.shutdownOperators) {
Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId);
if (deactivatedOpers == null) {
- this.shutdownOperators.put(windowId, deactivatedOpers = Sets.newHashSet(oper));
+ this.shutdownOperators.put(windowId, deactivatedOpers = new HashSet<>());
}
+ deactivatedOpers.add(oper);
}
sca.undeployOpers.add(oper.getId());
// record operator stop event
@@ -2264,7 +2265,7 @@ public class StreamingContainerManager implements PlanContext
oi.currentWindowId = toWsWindowId(os.currentWindowId.get());
if (os.lastHeartbeat != null) {
oi.lastHeartbeat = os.lastHeartbeat.getGeneratedTms();
- }
+ }
if (os.checkpointStats != null) {
oi.checkpointTime = os.checkpointStats.checkpointTime;
oi.checkpointStartTime = os.checkpointStats.checkpointStartTime;