You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/16 03:38:44 UTC
[5/7] drill git commit: DRILL-3092: Fix memory leak risk near uses of
RecordBatchData classes that need to have their resources freed upon an
allocation or other runtime failure.
DRILL-3092: Fix memory leak risk near uses of RecordBatchData classes that need to have their resources freed upon an allocation or other runtime failure.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e838abf8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e838abf8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e838abf8
Branch: refs/heads/master
Commit: e838abf8850726d039de47732eb5a713c8bf51ba
Parents: 30f5892
Author: Jason Altekruse <al...@gmail.com>
Authored: Thu May 14 17:32:09 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri May 15 17:38:27 2015 -0700
----------------------------------------------------------------------
.../exec/physical/impl/TopN/TopNBatch.java | 26 +++++---
.../exec/physical/impl/join/HashJoinBatch.java | 22 ++++---
.../physical/impl/join/NestedLoopJoinBatch.java | 12 +++-
.../impl/producer/ProducerConsumerBatch.java | 2 +-
.../physical/impl/sort/RecordBatchData.java | 7 +++
.../physical/impl/xsort/ExternalSortBatch.java | 62 +++++++++++---------
6 files changed, 85 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 349f1b1..516b028 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -217,15 +217,23 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
countSincePurge += incoming.getRecordCount();
batchCount++;
RecordBatchData batch = new RecordBatchData(incoming);
- batch.canonicalize();
- if (priorityQueue == null) {
- priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
- }
- priorityQueue.add(context, batch);
- if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
- purge();
- countSincePurge = 0;
- batchCount = 0;
+ boolean success = false;
+ try {
+ batch.canonicalize();
+ if (priorityQueue == null) {
+ priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+ }
+ priorityQueue.add(context, batch);
+ if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
+ purge();
+ countSincePurge = 0;
+ batchCount = 0;
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ batch.clear();
+ }
}
break;
default:
http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 6490251..5fd866f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -389,14 +389,22 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
* records that have matching keys on the probe side.
*/
RecordBatchData nextBatch = new RecordBatchData(right);
- if (hyperContainer == null) {
- hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
- } else {
- hyperContainer.addBatch(nextBatch.getContainer());
- }
+ boolean success = false;
+ try {
+ if (hyperContainer == null) {
+ hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
+ } else {
+ hyperContainer.addBatch(nextBatch.getContainer());
+ }
- // completed processing a batch, increment batch index
- buildBatchIndex++;
+ // completed processing a batch, increment batch index
+ buildBatchIndex++;
+ success = true;
+ } finally {
+ if (!success) {
+ nextBatch.clear();
+ }
+ }
break;
}
// Get the next record batch
http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 9bcea60..2d37fa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -342,8 +342,16 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
private void addBatchToHyperContainer(RecordBatch inputBatch) {
RecordBatchData batchCopy = new RecordBatchData(inputBatch);
- rightCounts.addLast(inputBatch.getRecordCount());
- rightContainer.addBatch(batchCopy.getContainer());
+ boolean success = false;
+ try {
+ rightCounts.addLast(inputBatch.getRecordCount());
+ rightContainer.addBatch(batchCopy.getContainer());
+ success = true;
+ } finally {
+ if (!success) {
+ batchCopy.clear();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index bca9622..b9a1641 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -171,7 +171,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
}
if (wrapper!=null) {
- wrapper.batch.getContainer().zeroVectors();
+ wrapper.batch.clear();
}
cleanUpLatch.countDown();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index fbd472e..af774db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -96,4 +96,11 @@ public class RecordBatchData {
return container;
}
+ public void clear() {
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ container.clear();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e838abf8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index de4a86e..8871a5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -305,36 +305,44 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
sorter.sort(sv2);
// logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), count);
RecordBatchData rbd = new RecordBatchData(incoming);
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) {
- rbd.setSv2(sv2);
- }
- batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
- batchesSinceLastSpill++;
- if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
- (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
- // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
- (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
- // current memory used is more than 95% of memory usage limit of this operator
- (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
- // current memory used is more than 95% of memory usage limit of this fragment
- (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
- // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
- // since the last spill exceed the defined limit
- (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
-
- if (firstSpillBatchCount == 0) {
- firstSpillBatchCount = batchGroups.size();
+ boolean success = false;
+ try {
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) {
+ rbd.setSv2(sv2);
}
-
- if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
- logger.info("Merging spills");
- spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+ batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
+ batchesSinceLastSpill++;
+ if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
+ (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
+ // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
+ (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
+ // current memory used is more than 95% of memory usage limit of this operator
+ (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+ // current memory used is more than 95% of memory usage limit of this fragment
+ (totalSizeInMemory > .95 * oContext.getAllocator().getFragmentLimit()) ||
+ // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
+ // since the last spill exceed the defined limit
+ (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
+
+ if (firstSpillBatchCount == 0) {
+ firstSpillBatchCount = batchGroups.size();
+ }
+
+ if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
+ logger.info("Merging spills");
+ spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+ }
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
+ batchesSinceLastSpill = 0;
}
- spilledBatchGroups.add(mergeAndSpill(batchGroups));
- batchesSinceLastSpill = 0;
- }
- long t = w.elapsed(TimeUnit.MICROSECONDS);
+ long t = w.elapsed(TimeUnit.MICROSECONDS);
// logger.debug("Took {} us to sort {} records", t, count);
+ success = true;
+ } finally {
+ if (!success) {
+ rbd.clear();
+ }
+ }
break;
case OUT_OF_MEMORY:
logger.debug("received OUT_OF_MEMORY, trying to spill");