You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/05/11 00:45:10 UTC
incubator-apex-core git commit: APEXCORE-296 #comment fixed the
memory leak
Repository: incubator-apex-core
Updated Branches:
refs/heads/release-3.2 f2fa255e1 -> 15b38a1f8
APEXCORE-296 #comment fixed the memory leak
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/15b38a1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/15b38a1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/15b38a1f
Branch: refs/heads/release-3.2
Commit: 15b38a1f81b1740c78189981d8294e978d8e0a5e
Parents: f2fa255
Author: Gaurav <ga...@datatorrent.com>
Authored: Mon Jan 11 17:21:53 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue May 10 17:42:26 2016 -0700
----------------------------------------------------------------------
.../datatorrent/stram/StreamingContainerManager.java | 13 +++++++++----
.../stram/plan/physical/OperatorStatus.java | 2 +-
2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/15b38a1f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index cafd5e4..7eb4c96 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1026,6 +1026,14 @@ public class StreamingContainerManager implements PlanContext
}
}
o.stats.lastWindowedStats = stats;
+ o.stats.operatorResponses = null;
+ if (!o.stats.responses.isEmpty()) {
+ o.stats.operatorResponses = new ArrayList<>();
+ StatsListener.OperatorResponse operatorResponse;
+ while ((operatorResponse = o.stats.responses.poll()) != null) {
+ o.stats.operatorResponses.add(operatorResponse);
+ }
+ }
if (o.stats.lastWindowedStats != null) {
// call listeners only with non empty window list
if (o.statsListeners != null) {
@@ -1523,10 +1531,7 @@ public class StreamingContainerManager implements PlanContext
LOG.debug(" Got back the response {} for the request {}", obj, obj.getResponseId());
}
else { // This is to identify user requests
- if (oper.stats.operatorResponses == null) {
- oper.stats.operatorResponses = new ArrayList<StatsListener.OperatorResponse>();
- }
- oper.stats.operatorResponses.add(obj);
+ oper.stats.responses.add(obj);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/15b38a1f/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
index 4548173..063203e 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/OperatorStatus.java
@@ -34,7 +34,6 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.StatsRevisions.VersionedLong;
-import com.datatorrent.stram.util.MovingAverage;
import com.datatorrent.stram.util.MovingAverage.MovingAverageLong;
import com.datatorrent.stram.util.MovingAverage.TimedMovingAverageLong;
@@ -83,6 +82,7 @@ public class OperatorStatus implements BatchedOperatorStats, java.io.Serializabl
public final ConcurrentLinkedQueue<List<OperatorStats>> listenerStats = new ConcurrentLinkedQueue<List<OperatorStats>>();
public volatile long lastWindowIdChangeTms = 0;
public final int windowProcessingTimeoutMillis;
+ public final ConcurrentLinkedQueue<StatsListener.OperatorResponse> responses = new ConcurrentLinkedQueue<>();
public List<StatsListener.OperatorResponse> operatorResponses;
private final LogicalPlan.OperatorMeta operatorMeta;