You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/09/18 02:27:46 UTC

drill git commit: DRILL-3779: Fix NPE in mergeAndSpill(). Add more debug logging messages. Make copier per-batch memory limit power of 2. Get rid of some warnings. Add a few comments.

Repository: drill
Updated Branches:
  refs/heads/master e52d473eb -> 813903a34


DRILL-3779: Fix NPE in mergeAndSpill().  Add more debug logging messages.  Make copier per-batch memory limit power of 2.
Get rid of some warnings.  Add a few comments.

Address review comments.

Close apache/drill#160


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/813903a3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/813903a3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/813903a3

Branch: refs/heads/master
Commit: 813903a34ea1c9c3fec28f2472312c8785f780c5
Parents: e52d473
Author: Aman Sinha <as...@maprtech.com>
Authored: Thu Sep 17 01:29:38 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu Sep 17 17:26:43 2015 -0700

----------------------------------------------------------------------
 .../physical/impl/xsort/ExternalSortBatch.java  | 63 +++++++++++++++-----
 1 file changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/813903a3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 31deada..f1e22b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -100,6 +100,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private SingleBatchSorter sorter;
   private SortRecordBatchBuilder builder;
   private MSorter mSorter;
+  /**
+   * A single PriorityQueueCopier instance is used for 2 purposes:
+   * 1. Merge sorted batches before spilling
+   * 2. Merge sorted batches when all incoming data fits in memory
+   */
   private PriorityQueueCopier copier;
   private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
   private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
@@ -114,6 +119,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private final String fileName;
   private int firstSpillBatchCount = 0;
 
+  /**
+   * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
+   * number of records to return in each batch.
+   */
+  private static final int COPIER_BATCH_MEM_LIMIT = 256 * 1024;
+
   public static final String INTERRUPTION_AFTER_SORT = "after-sort";
   public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
 
@@ -205,7 +216,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     switch (outcome) {
       case OK:
       case OK_NEW_SCHEMA:
-        for (VectorWrapper w : incoming) {
+        for (VectorWrapper<?> w : incoming) {
           ValueVector v = container.addOrGet(w.getField());
           if (v instanceof AbstractContainerVector) {
             w.getValueVector().makeTransferPair(v); // Can we remove this hack?
@@ -225,6 +236,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       case NONE:
         state = BatchState.DONE;
         break;
+      default:
+        break;
     }
   }
 
@@ -291,7 +304,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
             first = false;
           }
           if (incoming.getRecordCount() == 0) {
-            for (VectorWrapper w : incoming) {
+            for (VectorWrapper<?> w : incoming) {
               w.clear();
             }
             break;
@@ -345,7 +358,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
               if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
                 logger.info("Merging spills");
-                spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+                final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
+                if (merged != null) {
+                  spilledBatchGroups.addFirst(merged);
+                }
               }
               final BatchGroup merged = mergeAndSpill(batchGroups);
               if (merged != null) { // make sure we don't add null to spilledBatchGroups
@@ -366,8 +382,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           logger.debug("received OUT_OF_MEMORY, trying to spill");
           highWaterMark = totalSizeInMemory;
           if (batchesSinceLastSpill > 2) {
-            spilledBatchGroups.add(mergeAndSpill(batchGroups));
-            batchesSinceLastSpill = 0;
+            final BatchGroup merged = mergeAndSpill(batchGroups);
+            if (merged != null) {
+              spilledBatchGroups.add(merged);
+              batchesSinceLastSpill = 0;
+            }
           } else {
             logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
             return IterOutcome.OUT_OF_MEMORY;
@@ -418,7 +437,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         long t = watch.elapsed(TimeUnit.MICROSECONDS);
 //        logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
         container.buildSchema(SelectionVectorMode.FOUR_BYTE);
-      } else {
+      } else { // some batches were spilled
         final BatchGroup merged = mergeAndSpill(batchGroups);
         if (merged != null) {
           spilledBatchGroups.add(merged);
@@ -430,14 +449,14 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         createCopier(hyperBatch, batchGroups, container, false);
 
         int estimatedRecordSize = 0;
-        for (VectorWrapper w : batchGroups.get(0)) {
+        for (VectorWrapper<?> w : batchGroups.get(0)) {
           try {
             estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
           } catch (UnsupportedOperationException e) {
             estimatedRecordSize += 50;
           }
         }
-        targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, 250 * 1000 / estimatedRecordSize));
+        targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
         int count = copier.next(targetRecordCount);
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(count);
@@ -470,6 +489,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
     logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
+    logger.debug("mergeAndSpill: starting totalSizeInMemory = {}", totalSizeInMemory);
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
     int batchCount = batchGroups.size();
@@ -478,28 +498,36 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         break;
       }
       BatchGroup batch = batchGroups.pollLast();
+      assert batch != null : "Encountered a null batch during merge and spill operation";
       batchGroupList.add(batch);
       long bufferSize = getBufferSize(batch);
+      logger.debug("mergeAndSpill: buffer size for batch {} = {}", i, bufferSize);
       totalSizeInMemory -= bufferSize;
     }
+    logger.debug("mergeAndSpill: intermediate estimated total size in memory = {}", totalSizeInMemory);
+
     if (batchGroupList.size() == 0) {
       return null;
     }
     int estimatedRecordSize = 0;
-    for (VectorWrapper w : batchGroupList.get(0)) {
+    for (VectorWrapper<?> w : batchGroupList.get(0)) {
       try {
         estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
       } catch (UnsupportedOperationException e) {
         estimatedRecordSize += 50;
       }
     }
-    int targetRecordCount = Math.max(1, 250 * 1000 / estimatedRecordSize);
+    int targetRecordCount = Math.max(1, COPIER_BATCH_MEM_LIMIT / estimatedRecordSize);
     VectorContainer hyperBatch = constructHyperBatch(batchGroupList);
     createCopier(hyperBatch, batchGroupList, outputContainer, true);
 
     int count = copier.next(targetRecordCount);
     assert count > 0;
 
+    logger.debug("mergeAndSpill: estimated record size = {}, target record count = {}", estimatedRecordSize, targetRecordCount);
+
+    // 1 output container is kept in memory, so we want to hold on to it and transferClone
+    // allows keeping ownership
     VectorContainer c1 = VectorContainer.getTransferClone(outputContainer);
     c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     c1.setRecordCount(count);
@@ -512,6 +540,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       while ((count = copier.next(targetRecordCount)) > 0) {
         outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
         outputContainer.setRecordCount(count);
+        // note that addBatch also clears the outputContainer
         newGroup.addBatch(outputContainer);
       }
       newGroup.closeOutputStream();
@@ -522,14 +551,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    takeOwnership(c1);
-    totalSizeInMemory += getBufferSize(c1);
+    takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
+    long bufSize = getBufferSize(c1);
+    totalSizeInMemory += bufSize;
+    logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
     logger.info("Completed spilling to {}", outputFile);
     return newGroup;
   }
 
   private void takeOwnership(VectorAccessible batch) {
-    for (VectorWrapper w : batch) {
+    for (VectorWrapper<?> w : batch) {
       DrillBuf[] bufs = w.getValueVector().getBuffers(false);
       for (DrillBuf buf : bufs) {
         if (buf.isRootBuffer()) {
@@ -541,7 +572,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   private long getBufferSize(VectorAccessible batch) {
     long size = 0;
-    for (VectorWrapper w : batch) {
+    for (VectorWrapper<?> w : batch) {
       DrillBuf[] bufs = w.getValueVector().getBuffers(false);
       for (DrillBuf buf : bufs) {
         if (buf.isRootBuffer()) {
@@ -558,7 +589,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       try {
         spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
       } catch (SchemaChangeException e) {
-        throw new RuntimeException();
+        throw new RuntimeException(e);
       }
       batchesSinceLastSpill = 0;
       int waitTime = 1;
@@ -659,7 +690,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return context.getImplementationClass(cg);
   }
 
-  private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
+  private void generateComparisons(ClassGenerator<?> g, VectorAccessible batch) throws SchemaChangeException {
     g.setMappingSet(MAIN_MAPPING);
 
     for (Ordering od : popConfig.getOrderings()) {