You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:58 UTC
[12/17] drill git commit: Fix PartitionSenderRootExec possible memory
leak.
Fix PartitionSenderRootExec possible memory leak.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/62a73bcd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/62a73bcd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/62a73bcd
Branch: refs/heads/master
Commit: 62a73bcd82464b0a48a234e09040ed069033b848
Parents: aaf9fb8
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 14 21:55:44 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 22:14:59 2015 -0700
----------------------------------------------------------------------
.../PartitionSenderRootExec.java | 41 +++++++++++++-------
1 file changed, 26 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/62a73bcd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 1872a51..31fc160 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -230,25 +230,36 @@ public class PartitionSenderRootExec extends BaseRootExec {
final List<Partitioner> subPartitioners = createClassInstances(actualPartitions);
int startIndex = 0;
int endIndex = 0;
- for (int i = 0; i < actualPartitions; i++) {
- startIndex = endIndex;
- endIndex = (i < actualPartitions - 1 ) ? startIndex + divisor : outGoingBatchCount;
- if ( i < longTail ) {
- endIndex++;
+
+ boolean success = false;
+ try {
+ for (int i = 0; i < actualPartitions; i++) {
+ startIndex = endIndex;
+ endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+ if (i < longTail) {
+ endIndex++;
+ }
+ final OperatorStats partitionStats = new OperatorStats(stats, true);
+ subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+ startIndex, endIndex);
+ }
+
+ synchronized (this) {
+ partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+ for (int index = 0; index < terminations.size(); index++) {
+ partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+ }
+ terminations.clear();
}
- final OperatorStats partitionStats = new OperatorStats(stats, true);
- subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
- startIndex, endIndex);
- }
- synchronized(this){
- partitioner = new PartitionerDecorator(subPartitioners, stats, context);
- for (int index = 0; index < terminations.size(); index++) {
- partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+ success = true;
+ } finally {
+ if (!success) {
+ for (Partitioner p : subPartitioners) {
+ p.clear();
+ }
}
- terminations.clear();
}
-
}
private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {