You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:58 UTC

[12/17] drill git commit: Fix PartitionSenderRootExec possible memory leak.

Fix PartitionSenderRootExec possible memory leak.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/62a73bcd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/62a73bcd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/62a73bcd

Branch: refs/heads/master
Commit: 62a73bcd82464b0a48a234e09040ed069033b848
Parents: aaf9fb8
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 21:55:44 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:59 2015 -0700

----------------------------------------------------------------------
 .../PartitionSenderRootExec.java                | 41 +++++++++++++-------
 1 file changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/62a73bcd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 1872a51..31fc160 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -230,25 +230,36 @@ public class PartitionSenderRootExec extends BaseRootExec {
     final List<Partitioner> subPartitioners = createClassInstances(actualPartitions);
     int startIndex = 0;
     int endIndex = 0;
-    for (int i = 0; i < actualPartitions; i++) {
-      startIndex = endIndex;
-      endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount;
-      if ( i < longTail ) {
-        endIndex++;
+
+    boolean success = false;
+    try {
+      for (int i = 0; i < actualPartitions; i++) {
+        startIndex = endIndex;
+        endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+        if (i < longTail) {
+          endIndex++;
+        }
+        final OperatorStats partitionStats = new OperatorStats(stats, true);
+        subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+            startIndex, endIndex);
+      }
+
+      synchronized (this) {
+        partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+        for (int index = 0; index < terminations.size(); index++) {
+          partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+        }
+        terminations.clear();
       }
-      final OperatorStats partitionStats = new OperatorStats(stats, true);
-      subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
-        startIndex, endIndex);
-    }
 
-    synchronized(this){
-      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
-      for (int index = 0; index < terminations.size(); index++) {
-        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+      success = true;
+    } finally {
+      if (!success) {
+        for (Partitioner p : subPartitioners) {
+          p.clear();
+        }
       }
-      terminations.clear();
     }
-
   }
 
   private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {