You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/05/11 00:45:10 UTC

incubator-apex-core git commit: APEXCORE-296 #comment fixed the memory leak

Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 f2fa255e1 -> 15b38a1f8


APEXCORE-296 #comment fixed the memory leak


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/15b38a1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/15b38a1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/15b38a1f

Branch: refs/heads/release-3.2
Commit: 15b38a1f81b1740c78189981d8294e978d8e0a5e
Parents: f2fa255
Author: Gaurav <ga...@datatorrent.com>
Authored: Mon Jan 11 17:21:53 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue May 10 17:42:26 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/stram/StreamingContainerManager.java   | 13 +++++++++----
 .../stram/plan/physical/OperatorStatus.java            |  2 +-
 2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/15b38a1f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index cafd5e4..7eb4c96 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1026,6 +1026,14 @@ public class StreamingContainerManager implements PlanContext
         }
       }
       o.stats.lastWindowedStats = stats;
+      o.stats.operatorResponses = null;
+      if (!o.stats.responses.isEmpty()) {
+        o.stats.operatorResponses = new ArrayList<>();
+        StatsListener.OperatorResponse operatorResponse;
+        while ((operatorResponse = o.stats.responses.poll()) != null) {
+          o.stats.operatorResponses.add(operatorResponse);
+        }
+      }
       if (o.stats.lastWindowedStats != null) {
         // call listeners only with non empty window list
         if (o.statsListeners != null) {
@@ -1523,10 +1531,7 @@ public class StreamingContainerManager implements PlanContext
             LOG.debug(" Got back the response {} for the request {}", obj, obj.getResponseId());
           }
           else {       // This is to identify user requests
-            if (oper.stats.operatorResponses == null) {
-              oper.stats.operatorResponses = new ArrayList<StatsListener.OperatorResponse>();
-            }
-            oper.stats.operatorResponses.add(obj);
+            oper.stats.responses.add(obj);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/15b38a1f/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
index 4548173..063203e 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
@@ -34,7 +34,6 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
 import com.datatorrent.stram.engine.OperatorContext;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.physical.StatsRevisions.VersionedLong;
-import com.datatorrent.stram.util.MovingAverage;
 import com.datatorrent.stram.util.MovingAverage.MovingAverageLong;
 import com.datatorrent.stram.util.MovingAverage.TimedMovingAverageLong;
 
@@ -83,6 +82,7 @@ public class OperatorStatus implements BatchedOperatorStats, java.io.Serializabl
   public final ConcurrentLinkedQueue<List<OperatorStats>> listenerStats = new ConcurrentLinkedQueue<List<OperatorStats>>();
   public volatile long lastWindowIdChangeTms = 0;
   public final int windowProcessingTimeoutMillis;
+  public final ConcurrentLinkedQueue<StatsListener.OperatorResponse> responses = new ConcurrentLinkedQueue<>();
   public List<StatsListener.OperatorResponse> operatorResponses;
 
   private final LogicalPlan.OperatorMeta operatorMeta;