You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/07/10 21:50:28 UTC
drill git commit: DRILL-3133: MergingRecordBatch can leak memory if
query is canceled before batches in rawBatches were loaded
Repository: drill
Updated Branches:
refs/heads/master 72f946964 -> 68aa81f47
DRILL-3133: MergingRecordBatch can leak memory if query is canceled before batches in rawBatches were loaded
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/68aa81f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/68aa81f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/68aa81f4
Branch: refs/heads/master
Commit: 68aa81f471d2cea779f7f2acd7d84ce43bb1b94b
Parents: 72f9469
Author: adeneche <ad...@gmail.com>
Authored: Mon May 18 10:01:52 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Thu Jul 9 16:19:28 2015 -0700
----------------------------------------------------------------------
.../impl/mergereceiver/MergingRecordBatch.java | 70 ++++++++++----------
1 file changed, 35 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/68aa81f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 3ca11f1..49e81ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -96,7 +96,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private RecordBatchLoader[] batchLoaders;
private final RawFragmentBatchProvider[] fragProviders;
private final FragmentContext context;
- private BatchSchema schema;
private VectorContainer outgoingContainer;
private MergingReceiverGeneratorBase merger;
private final MergingReceiverPOP config;
@@ -139,7 +138,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.outputCounts = new long[config.getNumSenders()];
}
- private RawFragmentBatch getNext(final int providerIndex) throws IOException{
+ private RawFragmentBatch getNext(final int providerIndex) throws IOException {
stats.startWait();
final RawFragmentBatchProvider provider = fragProviders[providerIndex];
try {
@@ -162,6 +161,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
+ private void clearBatches(List<RawFragmentBatch> batches) {
+ for (RawFragmentBatch batch : batches) {
+ if (batch != null) {
+ batch.release();
+ }
+ }
+ }
+
@Override
public IterOutcome innerNext() {
if (fragProviders.length == 0) {
@@ -190,22 +197,25 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
int p = 0;
for (final RawFragmentBatchProvider provider : fragProviders) {
- RawFragmentBatch rawBatch = null;
- try {
- // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
- if (tempBatchHolder[p] != null) {
- rawBatch = tempBatchHolder[p];
- tempBatchHolder[p] = null;
- } else {
+ RawFragmentBatch rawBatch;
+ // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
+ if (tempBatchHolder[p] != null) {
+ rawBatch = tempBatchHolder[p];
+ tempBatchHolder[p] = null;
+ } else {
+ try {
rawBatch = getNext(p);
- }
- if (rawBatch == null && !context.shouldContinue()) {
+ } catch (final IOException e) {
+ context.fail(e);
return IterOutcome.STOP;
}
- } catch (final IOException e) {
- context.fail(e);
+ }
+ if (rawBatch == null && !context.shouldContinue()) {
+ clearBatches(rawBatches);
return IterOutcome.STOP;
}
+
+ assert rawBatch != null : "rawBatch is null although context.shouldContinue() == true";
if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
rawBatches.add(rawBatch);
} else {
@@ -215,13 +225,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
try {
while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
- ;
+ // Do nothing
}
if (rawBatch == null && !context.shouldContinue()) {
+ clearBatches(rawBatches);
return IterOutcome.STOP;
}
} catch (final IOException e) {
context.fail(e);
+ clearBatches(rawBatches);
return IterOutcome.STOP;
}
if (rawBatch != null) {
@@ -243,6 +255,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator());
}
+ // after this point all batches have moved to incomingBatches
+ rawBatches.clear();
+
int i = 0;
for (final RawFragmentBatch batch : incomingBatches) {
// initialize the incoming batchLoaders
@@ -261,6 +276,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
++i;
}
+ // after this point all batches have been released and their bytebuf are in batchLoaders
+
// Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
for (final RecordBatchLoader loader : batchLoaders) {
loader.canonicalize();
@@ -268,31 +285,22 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// Ensure all the incoming batches have the identical schema.
if (!isSameSchemaAmongBatches(batchLoaders)) {
- logger.error("Incoming batches for merging receiver have diffferent schemas!");
- context.fail(new SchemaChangeException("Incoming batches for merging receiver have diffferent schemas!"));
+ context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
return IterOutcome.STOP;
}
// create the outgoing schema and vector container, and allocate the initial batch
final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
- int vectorCount = 0;
for (final VectorWrapper<?> v : batchLoaders[0]) {
// add field to the output schema
bldr.addField(v.getField());
// allocate a new value vector
- final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
- ++vectorCount;
+ outgoingContainer.addOrGet(v.getField());
}
allocateOutgoing();
-
- schema = bldr.build();
- if (schema != null && !schema.equals(schema)) {
- // TODO: handle case where one or more batches implicitly indicate schema change
- logger.debug("Initial state has incoming batches with different schemas");
- }
outgoingContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
// generate code for merge operations (copy and compare)
@@ -305,7 +313,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
// allocate the priority queue with the generated comparator
- this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
+ this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() {
public int compare(final Node node1, final Node node2) {
final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
@@ -321,8 +329,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
incomingBatches[b] = batch;
if (batch != null) {
batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
- // TODO: Clean: DRILL-2933: That load(...) no longer throws
- // SchemaChangeException, so check/clean catch clause below.
} else {
batchLoaders[b].clear();
batchLoaders[b] = null;
@@ -353,15 +359,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
pqueue.poll();
-// if (isOutgoingFull()) {
-// // set a flag so that we reallocate on the next iteration
-// logger.debug("Outgoing vectors record batch size reached; breaking");
-// prevBatchWasFull = true;
-// }
-
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
- RawFragmentBatch nextBatch = null;
+ RawFragmentBatch nextBatch;
try {
nextBatch = getNext(node.batchId);