You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/01/06 00:37:50 UTC

drill git commit: DRILL-4236: ExternalSort should use the new allocator functionality to better manage it's memory usage

Repository: drill
Updated Branches:
  refs/heads/master e4372f224 -> 884c5b095


DRILL-4236: ExternalSort should use the new allocator functionality to better manage it's memory usage

this closes #317


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/884c5b09
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/884c5b09
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/884c5b09

Branch: refs/heads/master
Commit: 884c5b09554d7d74e0709b61c37ade319665a850
Parents: e4372f2
Author: adeneche <ad...@gmail.com>
Authored: Wed Dec 30 14:41:20 2015 -0800
Committer: adeneche <ad...@gmail.com>
Committed: Tue Jan 5 15:37:16 2016 -0800

----------------------------------------------------------------------
 .../drill/common/exceptions/UserException.java  | 13 +++-
 .../physical/impl/xsort/ExternalSortBatch.java  | 77 +++++++-------------
 2 files changed, 36 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/884c5b09/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 4473ee5..35e71d1 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -50,7 +50,16 @@ public class UserException extends DrillRuntimeException {
    */
   public static Builder memoryError(final Throwable cause) {
     return UserException.resourceError(cause)
-      .message(MEMORY_ERROR_MSG);
+      .message(MEMORY_ERROR_MSG).addContext(cause.getMessage());
+  }
+
+  public static Builder memoryError(final String format, final Object... args) {
+    Builder builder =  UserException.resourceError();
+    builder.message(MEMORY_ERROR_MSG);
+    if (!format.isEmpty()) {
+      builder.addContext(String.format(format, args));
+    }
+    return builder;
   }
 
   /**
@@ -59,7 +68,7 @@ public class UserException extends DrillRuntimeException {
    * @return resource error builder
    */
   public static Builder memoryError() {
-    return memoryError(null);
+    return memoryError("");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/884c5b09/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 6e79f01..8ea16b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
@@ -95,7 +93,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private final int SPILL_THRESHOLD;
   private final Iterator<String> dirs;
   private final RecordBatch incoming;
-  private BufferAllocator copierAllocator;
+  private final BufferAllocator oAllocator;
+  private final BufferAllocator copierAllocator;
 
   private BatchSchema schema;
   private SingleBatchSorter sorter;
@@ -114,12 +113,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private int spillCount = 0;
   private int batchesSinceLastSpill = 0;
   private boolean first = true;
-  private long totalSizeInMemory = 0;
-  private long highWaterMark = Long.MAX_VALUE;
   private int targetRecordCount;
   private final String fileName;
   private int firstSpillBatchCount = 0;
-  private long peakSizeInMemory = -1;
   private int peakNumBatches = -1;
 
   /**
@@ -157,7 +153,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
     SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
     dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
-    copierAllocator = oContext.getAllocator().newChildAllocator(oContext.getAllocator().getName() + ":copier",
+    oAllocator = oContext.getAllocator();
+    copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier",
         PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
     FragmentHandle handle = context.getHandle();
     fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
@@ -356,17 +353,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
               throw new OutOfMemoryException(e);
             }
           }
-          totalSizeInMemory += getBufferSize(convertedBatch);
-          if (peakSizeInMemory < totalSizeInMemory) {
-            peakSizeInMemory = totalSizeInMemory;
-            stats.setLongStat(Metric.PEAK_SIZE_IN_MEMORY, peakSizeInMemory);
-          }
 
           int count = sv2.getCount();
           totalCount += count;
           sorter.setup(context, sv2, convertedBatch);
           sorter.sort(sv2);
-          RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator());
+          RecordBatchData rbd = new RecordBatchData(convertedBatch, oAllocator);
           boolean success = false;
           try {
             rbd.setSv2(sv2);
@@ -377,16 +369,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
             }
 
             batchesSinceLastSpill++;
-            if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
-                (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
-                // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
+            if (// If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
                 (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
                 // If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address
                 (spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
                 // current memory used is more than 95% of memory usage limit of this operator
-                (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
-                // current memory used is more than 95% of memory usage limit of this fragment
-                (totalSizeInMemory > .95 * oContext.getAllocator().getLimit()) ||
+                (oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) ||
                 // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
                 // since the last spill exceed the defined limit
                 (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
@@ -417,7 +405,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           break;
         case OUT_OF_MEMORY:
           logger.debug("received OUT_OF_MEMORY, trying to spill");
-          highWaterMark = totalSizeInMemory;
           if (batchesSinceLastSpill > 2) {
             final BatchGroup merged = mergeAndSpill(batchGroups);
             if (merged != null) {
@@ -443,10 +430,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           builder.clear();
           builder.close();
         }
-        builder = new SortRecordBatchBuilder(oContext.getAllocator());
+        builder = new SortRecordBatchBuilder(oAllocator);
 
         for (BatchGroup group : batchGroups) {
-          RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator());
+          RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator);
           rbd.setSv2(group.getSv2());
           builder.add(rbd);
         }
@@ -454,7 +441,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         builder.build(context, container);
         sv4 = builder.getSv4();
         mSorter = createNewMSorter();
-        mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
+        mSorter.setup(context, oAllocator, getSelectionVector4(), this.container);
 
         // For testing memory-leak purpose, inject exception after mSorter finishes setup
         injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP);
@@ -478,7 +465,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         batchGroups.addAll(spilledBatchGroups);
         spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, all it's batches are in batchGroups now
 
-        logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
+        logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oAllocator.getAllocatedMemory());
         VectorContainer hyperBatch = constructHyperBatch(batchGroups);
         createCopier(hyperBatch, batchGroups, container, false);
 
@@ -513,7 +500,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   private boolean hasMemoryForInMemorySort(int currentRecordCount) {
-    long currentlyAvailable =  popConfig.getMaxAllocation() - oContext.getAllocator().getAllocatedMemory();
+    long currentlyAvailable =  popConfig.getMaxAllocation() - oAllocator.getAllocatedMemory();
 
     long neededForInMemorySort = SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
         MSortTemplate.memoryNeeded(currentRecordCount);
@@ -523,7 +510,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
     logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
-    logger.debug("mergeAndSpill: starting totalSizeInMemory = {}", totalSizeInMemory);
+    logger.debug("mergeAndSpill: starting total size in memory = {}", oAllocator.getAllocatedMemory());
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
     int batchCount = batchGroups.size();
@@ -534,11 +521,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       BatchGroup batch = batchGroups.pollLast();
       assert batch != null : "Encountered a null batch during merge and spill operation";
       batchGroupList.add(batch);
-      long bufferSize = getBufferSize(batch);
-      logger.debug("mergeAndSpill: buffer size for batch {} = {}", i, bufferSize);
-      totalSizeInMemory -= bufferSize;
     }
-    logger.debug("mergeAndSpill: intermediate estimated total size in memory = {}", totalSizeInMemory);
 
     if (batchGroupList.size() == 0) {
       return null;
@@ -588,36 +571,26 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     } finally {
       hyperBatch.clear();
     }
-    long bufSize = getBufferSize(c1);
-    totalSizeInMemory += bufSize;
-    logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
+    logger.debug("mergeAndSpill: final total size in memory = {}", oAllocator.getAllocatedMemory());
     logger.info("Completed spilling to {}", outputFile);
     return newGroup;
   }
 
-  private long getBufferSize(VectorAccessible batch) {
-    long size = 0;
-    for (VectorWrapper<?> w : batch) {
-      DrillBuf[] bufs = w.getValueVector().getBuffers(false);
-      for (DrillBuf buf : bufs) {
-        size += buf.getPossibleMemoryConsumed();
-      }
-    }
-    return size;
-  }
-
   private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
-    SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
+    SelectionVector2 sv2 = new SelectionVector2(oAllocator);
     if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
       try {
-        // Not being able to allocate sv2 means this operator's allocator reached it's maximum capacity.
-        // Spilling this.batchGroups won't help here as they are owned by upstream operator,
-        // but spilling spilledBatchGroups may free enough memory
-        final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
+        final BatchGroup merged = mergeAndSpill(batchGroups);
         if (merged != null) {
-          spilledBatchGroups.addFirst(merged);
+          spilledBatchGroups.add(merged);
         } else {
-          throw new OutOfMemoryException("Unable to allocate sv2, and not enough batchGroups to spill");
+          throw UserException.memoryError("Unable to allocate sv2 for %d records, and not enough batchGroups to spill.",
+              incoming.getRecordCount())
+            .addContext("batchGroups.size", batchGroups.size())
+            .addContext("spilledBatchGroups.size", spilledBatchGroups.size())
+            .addContext("allocated memory", oAllocator.getAllocatedMemory())
+            .addContext("allocator limit", oAllocator.getLimit())
+            .build(logger);
         }
       } catch (SchemaChangeException e) {
         throw new RuntimeException(e);
@@ -771,7 +744,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         copier.close();
       }
 
-      BufferAllocator allocator = spilling ? copierAllocator : oContext.getAllocator();
+      BufferAllocator allocator = spilling ? copierAllocator : oAllocator;
       for (VectorWrapper<?> i : batch) {
         ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
         outputContainer.add(v);