You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/07/10 21:50:28 UTC

drill git commit: DRILL-3133: MergingRecordBatch can leak memory if query is canceled before batches in rawBatches were loaded

Repository: drill
Updated Branches:
  refs/heads/master 72f946964 -> 68aa81f47


DRILL-3133: MergingRecordBatch can leak memory if query is canceled before batches in rawBatches were loaded


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/68aa81f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/68aa81f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/68aa81f4

Branch: refs/heads/master
Commit: 68aa81f471d2cea779f7f2acd7d84ce43bb1b94b
Parents: 72f9469
Author: adeneche <ad...@gmail.com>
Authored: Mon May 18 10:01:52 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Thu Jul 9 16:19:28 2015 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 70 ++++++++++----------
 1 file changed, 35 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/68aa81f4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 3ca11f1..49e81ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -96,7 +96,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private RecordBatchLoader[] batchLoaders;
   private final RawFragmentBatchProvider[] fragProviders;
   private final FragmentContext context;
-  private BatchSchema schema;
   private VectorContainer outgoingContainer;
   private MergingReceiverGeneratorBase merger;
   private final MergingReceiverPOP config;
@@ -139,7 +138,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.outputCounts = new long[config.getNumSenders()];
   }
 
-  private RawFragmentBatch getNext(final int providerIndex) throws IOException{
+  private RawFragmentBatch getNext(final int providerIndex) throws IOException {
     stats.startWait();
     final RawFragmentBatchProvider provider = fragProviders[providerIndex];
     try {
@@ -162,6 +161,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
   }
 
+  private void clearBatches(List<RawFragmentBatch> batches) {
+    for (RawFragmentBatch batch : batches) {
+      if (batch != null) {
+        batch.release();
+      }
+    }
+  }
+
   @Override
   public IterOutcome innerNext() {
     if (fragProviders.length == 0) {
@@ -190,22 +197,25 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
       int p = 0;
       for (final RawFragmentBatchProvider provider : fragProviders) {
-        RawFragmentBatch rawBatch = null;
-        try {
-          // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
-          if (tempBatchHolder[p] != null) {
-            rawBatch = tempBatchHolder[p];
-            tempBatchHolder[p] = null;
-          } else {
+        RawFragmentBatch rawBatch;
+        // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
+        if (tempBatchHolder[p] != null) {
+          rawBatch = tempBatchHolder[p];
+          tempBatchHolder[p] = null;
+        } else {
+          try {
             rawBatch = getNext(p);
-          }
-          if (rawBatch == null && !context.shouldContinue()) {
+          } catch (final IOException e) {
+            context.fail(e);
             return IterOutcome.STOP;
           }
-        } catch (final IOException e) {
-          context.fail(e);
+        }
+        if (rawBatch == null && !context.shouldContinue()) {
+          clearBatches(rawBatches);
           return IterOutcome.STOP;
         }
+
+        assert rawBatch != null : "rawBatch is null although context.shouldContinue() == true";
         if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
           rawBatches.add(rawBatch);
         } else {
@@ -215,13 +225,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
           }
           try {
             while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
-              ;
+              // Do nothing
             }
             if (rawBatch == null && !context.shouldContinue()) {
+              clearBatches(rawBatches);
               return IterOutcome.STOP;
             }
           } catch (final IOException e) {
             context.fail(e);
+            clearBatches(rawBatches);
             return IterOutcome.STOP;
           }
           if (rawBatch != null) {
@@ -243,6 +255,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator());
       }
 
+      // after this point all batches have moved to incomingBatches
+      rawBatches.clear();
+
       int i = 0;
       for (final RawFragmentBatch batch : incomingBatches) {
         // initialize the incoming batchLoaders
@@ -261,6 +276,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         ++i;
       }
 
+      // after this point all batches have been released and their bytebuf are in batchLoaders
+
       // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
       for (final RecordBatchLoader loader : batchLoaders) {
         loader.canonicalize();
@@ -268,31 +285,22 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // Ensure all the incoming batches have the identical schema.
       if (!isSameSchemaAmongBatches(batchLoaders)) {
-        logger.error("Incoming batches for merging receiver have diffferent schemas!");
-        context.fail(new SchemaChangeException("Incoming batches for merging receiver have diffferent schemas!"));
+        context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
         return IterOutcome.STOP;
       }
 
       // create the outgoing schema and vector container, and allocate the initial batch
       final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
-      int vectorCount = 0;
       for (final VectorWrapper<?> v : batchLoaders[0]) {
 
         // add field to the output schema
         bldr.addField(v.getField());
 
         // allocate a new value vector
-        final ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
-        ++vectorCount;
+        outgoingContainer.addOrGet(v.getField());
       }
       allocateOutgoing();
 
-
-      schema = bldr.build();
-      if (schema != null && !schema.equals(schema)) {
-        // TODO: handle case where one or more batches implicitly indicate schema change
-        logger.debug("Initial state has incoming batches with different schemas");
-      }
       outgoingContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
       // generate code for merge operations (copy and compare)
@@ -305,7 +313,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
 
       // allocate the priority queue with the generated comparator
-      this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
+      this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() {
         public int compare(final Node node1, final Node node2) {
           final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
           final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
@@ -321,8 +329,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
             incomingBatches[b] = batch;
             if (batch != null) {
               batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
-              // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
-              // SchemaChangeException, so check/clean catch clause below.
             } else {
               batchLoaders[b].clear();
               batchLoaders[b] = null;
@@ -353,15 +359,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       }
       pqueue.poll();
 
-//      if (isOutgoingFull()) {
-//        // set a flag so that we reallocate on the next iteration
-//        logger.debug("Outgoing vectors record batch size reached; breaking");
-//        prevBatchWasFull = true;
-//      }
-
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
-        RawFragmentBatch nextBatch = null;
+        RawFragmentBatch nextBatch;
         try {
           nextBatch = getNext(node.batchId);