You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:12 UTC

[41/50] incubator-apex-core git commit: Fix for review comment.

Fix for review comment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/064edf08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/064edf08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/064edf08

Branch: refs/heads/master
Commit: 064edf08447d88ad9147bfab460705dfbaf4b4f3
Parents: d19fa66
Author: thomas <th...@datatorrent.com>
Authored: Fri Aug 21 12:43:33 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Fri Aug 21 12:43:33 2015 -0700

----------------------------------------------------------------------
 .../com/datatorrent/stram/StreamingContainerManager.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/064edf08/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index eed2948..7002c1d 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1004,7 +1004,7 @@ public class StreamingContainerManager implements PlanContext
       }
       reportStats.remove(o);
     }
-    
+
     if (!this.shutdownOperators.isEmpty()) {
       synchronized (this.shutdownOperators) {
         Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator();
@@ -1023,7 +1023,7 @@ public class StreamingContainerManager implements PlanContext
         }
       }
     }
-    
+
     if (!eventQueue.isEmpty()) {
       for (PTOperator oper : plan.getAllOperators().values()) {
         if (oper.getState() != PTOperator.State.ACTIVE) {
@@ -1297,7 +1297,7 @@ public class StreamingContainerManager implements PlanContext
             case SHUTDOWN:
               // schedule operator deactivation against the windowId
               // will be processed once window is committed and all dependent operators completed processing
-              long windowId = oper.stats.currentWindowId.get(); 
+              long windowId = oper.stats.currentWindowId.get();
               if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) {
                 windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId;
               }
@@ -1305,8 +1305,9 @@ public class StreamingContainerManager implements PlanContext
               synchronized (this.shutdownOperators) {
                 Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId);
                 if (deactivatedOpers == null) {
-                  this.shutdownOperators.put(windowId, deactivatedOpers = Sets.newHashSet(oper));
+                  this.shutdownOperators.put(windowId, deactivatedOpers = new HashSet<>());
                 }
+                deactivatedOpers.add(oper);
               }
               sca.undeployOpers.add(oper.getId());
               // record operator stop event
@@ -2264,7 +2265,7 @@ public class StreamingContainerManager implements PlanContext
     oi.currentWindowId = toWsWindowId(os.currentWindowId.get());
     if (os.lastHeartbeat != null) {
       oi.lastHeartbeat = os.lastHeartbeat.getGeneratedTms();
-    }    
+    }
     if (os.checkpointStats != null) {
       oi.checkpointTime = os.checkpointStats.checkpointTime;
       oi.checkpointStartTime = os.checkpointStats.checkpointStartTime;