You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/08/15 13:44:14 UTC

[02/13] drill git commit: DRILL-5601: Rollup of external sort fixes an improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
index 213720f..cd03b70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
@@ -19,7 +19,160 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Computes the memory needs for input batches, spill batches and merge
+ * batches. The key challenges that this code tries to overcome are:
+ * <ul>
+ * <li>Drill is not designed for the small memory allocations,
+ * but the planner may provide such allocations because the memory per
+ * query is divided among slices (minor fragments) and among buffering
+ * operators, leaving very little per operator.</li>
+ * <li>Drill does not provide the detailed memory information needed to
+ * carefully manage memory in tight constraints.</li>
+ * <li>But, Drill has a death penalty for going over the memory limit.</li>
+ * </ul>
+ * As a result, this class is a bit of a hack: it attempt to consider a
+ * number of ill-defined factors in order to divide up memory use in a
+ * way that prevents OOM errors.
+ * <p>
+ * First, it is necessary to differentiate two concepts:
+ * <ul>
+ * <li>The <i>data size</i> of a batch: the amount of memory needed to hold
+ * the data itself. The data size is constant for any given batch.</li>
+ * <li>The <i>buffer size</i> of the buffers that hold the data. The buffer
+ * size varies wildly depending on how the batch was produced.</li>
+ * </ul>
+ * The three kinds of buffer layouts seen to date include:
+ * <ul>
+ * <li>One buffer per vector component (data, offsets, null flags, etc.)
+ * &ndash; create by readers, project and other operators.</li>
+ * <li>One buffer for the entire batch, with each vector component using
+ * a slice of the overall buffer. &ndash; case for batches deserialized from
+ * exchanges.</li>
+ * <li>One buffer for each top-level vector, with component vectors
+ * using slices of the overall vector buffer &ndash; the result of reading
+ * spilled batches from disk.</li>
+ * </ul>
+ * In each case, buffer sizes are power-of-two rounded from the data size.
+ * But since the data is grouped differently in each case, the resulting buffer
+ * sizes vary considerably.
+ * <p>
+ * As a result, we can never be sure of the amount of memory needed for a
+ * batch. So, we have to estimate based on a number of factors:
+ * <ul>
+ * <li>Uses the {@link RecordBatchSizer} to estimate the data size and
+ * buffer size of each incoming batch.</li>
+ * <li>Estimates the internal fragmentation due to power-of-two rounding.</li>
+ * <li>Configured preferences for spill and output batches.</li>
+ * </ul>
+ * The code handles "normal" and "low" memory conditions.
+ * <ul>
+ * <li>In normal memory, we simply work out the number of preferred-size
+ * batches that fit in memory (based on the predicted buffer size.)</li>
+ * <li>In low memory, we divide up the available memory to produce the
+ * spill and merge batch sizes. The sizes will be less than the configured
+ * preference.</li>
+ * </ul>
+ * <p>
+ * The sort has two key configured parameters: the spill file size and the
+ * size of the output (downstream) batch. The spill file size is chosen to
+ * be large enough to ensure efficient I/O, but not so large as to overwhelm
+ * any one spill directory. The output batch size is chosen to be large enough
+ * to amortize the per-batch overhead over the maximum number of records, but
+ * not so large as to overwhelm downstream operators. Setting these parameters
+ * is a judgment call.
+ * <p>
+ * Under limited memory, the above sizes may be too big for the space available.
+ * For example, the default spill file size is 256 MB. But, if the sort is
+ * only given 50 MB, then spill files will be smaller. The default output batch
+ * size is 16 MB, but if the sort is given only 20 MB, then the output batch must
+ * be smaller. The low memory logic starts with the memory available and works
+ * backwards to figure out spill batch size, output batch size and spill file
+ * size. The sizes will be smaller than optimal, but as large as will fit in
+ * the memory provided.
+ */
+
 public class SortMemoryManager {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+
+  /**
+   * Estimate for typical internal fragmentation in a buffer due to power-of-two
+   * rounding on vectors.
+   * <p>
+   * <p>
+   * <pre>[____|__$__]</pre>
+   * In the above, the brackets represent the whole vector. The
+   * first half is always full. The $ represents the end of data.
+   * When the first half filled, the second
+   * half was allocated. On average, the second half will be half full.
+   * This means that, on average, 1/4 of the allocated space is
+   * unused (the definition of internal fragmentation.)
+   */
+
+  public static final double INTERNAL_FRAGMENTATION_ESTIMATE = 1.0/4.0;
+
+  /**
+   * Given a buffer, this is the assumed amount of space
+   * available for data. (Adding more will double the buffer
+   * size half the time.)
+   */
+
+  public static final double PAYLOAD_FROM_BUFFER = 1 - INTERNAL_FRAGMENTATION_ESTIMATE;
+
+  /**
+   * Given a data size, this is the multiplier to create the buffer
+   * size estimate. (Note: since we work with aggregate batches, we
+   * cannot simply round up to the next power of two: rounding is done
+   * on a vector-by-vector basis. Here we need to estimate the aggregate
+   * effect of rounding.
+   */
+
+  public static final double BUFFER_FROM_PAYLOAD = 3.0 / 2.0;
+
+  /**
+   * On really bad days, we will add one more byte (or value) to a vector
+   * than fits in a power-of-two sized buffer, forcing a doubling. In this
+   * case, half the resulting buffer is empty.
+   */
+
+  public static final double WORST_CASE_BUFFER_RATIO = 2.0;
+
+  /**
+   * Desperate attempt to keep spill batches from being too small in low memory.
+   * <p>
+   * The number is also used for logging: the system will log a warning if
+   * batches fall below this number which may represent too little memory
+   * allocated for the job at hand. (Queries operate on big data: many records.
+   * Batches with too few records are a probable performance hit. But, what is
+   * too few? It is a judgment call.)
+   */
+
+  public static final int MIN_ROWS_PER_SORT_BATCH = 100;
+  public static final double LOW_MEMORY_MERGE_BATCH_RATIO = 0.25;
+
+  public static class BatchSizeEstimate {
+    int dataSize;
+    int expectedBufferSize;
+    int maxBufferSize;
+
+    public void setFromData(int dataSize) {
+      this.dataSize = dataSize;
+      expectedBufferSize = multiply(dataSize, BUFFER_FROM_PAYLOAD);
+      maxBufferSize = multiply(dataSize, WORST_CASE_BUFFER_RATIO);
+    }
+
+    public void setFromBuffer(int bufferSize) {
+      expectedBufferSize = bufferSize;
+      dataSize = multiply(bufferSize, PAYLOAD_FROM_BUFFER);
+      maxBufferSize = multiply(dataSize, WORST_CASE_BUFFER_RATIO);
+    }
+
+    public void setFromWorstCaseBuffer(int bufferSize) {
+      maxBufferSize = bufferSize;
+      dataSize = multiply(maxBufferSize, 1 / WORST_CASE_BUFFER_RATIO);
+      expectedBufferSize = multiply(dataSize, BUFFER_FROM_PAYLOAD);
+    }
+ }
 
   /**
    * Maximum memory this operator may use. Usually comes from the
@@ -42,13 +195,13 @@ public class SortMemoryManager {
    * value.
    */
 
-  private int expectedMergeBatchSize;
+  private final BatchSizeEstimate mergeBatchSize = new BatchSizeEstimate();
 
   /**
    * Estimate of the input batch size based on the largest batch seen
    * thus far.
    */
-  private int estimatedInputBatchSize;
+  private final BatchSizeEstimate inputBatchSize = new BatchSizeEstimate();
 
   /**
    * Maximum memory level before spilling occurs. That is, we can buffer input
@@ -86,7 +239,7 @@ public class SortMemoryManager {
    * details of the data rows for any particular query.
    */
 
-  private int expectedSpillBatchSize;
+  private final BatchSizeEstimate spillBatchSize = new BatchSizeEstimate();
 
   /**
    * The number of records to add to each output batch sent to the
@@ -97,24 +250,41 @@ public class SortMemoryManager {
 
   private SortConfig config;
 
-  private int estimatedInputSize;
-
   private boolean potentialOverflow;
 
-  public SortMemoryManager(SortConfig config, long memoryLimit) {
+  private boolean isLowMemory;
+
+  private boolean performanceWarning;
+
+  public SortMemoryManager(SortConfig config, long opMemoryLimit) {
     this.config = config;
 
     // The maximum memory this operator can use as set by the
     // operator definition (propagated to the allocator.)
 
-    if (config.maxMemory() > 0) {
-      this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
-    } else {
-      this.memoryLimit = memoryLimit;
-    }
+    final long configMemoryLimit = config.maxMemory();
+    memoryLimit = (configMemoryLimit == 0) ? opMemoryLimit
+                : Math.min(opMemoryLimit, configMemoryLimit);
 
     preferredSpillBatchSize = config.spillBatchSize();;
     preferredMergeBatchSize = config.mergeBatchSize();
+
+    // Initialize the buffer memory limit for the first batch.
+    // Assume 1/2 of (allocated - spill batch size).
+
+    bufferMemoryLimit = (memoryLimit - config.spillBatchSize()) / 2;
+    if (bufferMemoryLimit < 0) {
+      // Bad news: not enough for even the spill batch.
+      // Assume half of memory, will adjust later.
+      bufferMemoryLimit = memoryLimit / 2;
+    }
+
+    if (memoryLimit == opMemoryLimit) {
+      logger.debug("Memory config: Allocator limit = {}", memoryLimit);
+    } else {
+      logger.debug("Memory config: Allocator limit = {}, Configured limit: {}",
+                   opMemoryLimit, memoryLimit);
+    }
   }
 
   /**
@@ -134,36 +304,39 @@ public class SortMemoryManager {
    * phase, and how many spill batches we can merge during the merge
    * phase.
    *
-   * @param batchSize the overall size of the current batch received from
+   * @param batchDataSize the overall size of the current batch received from
    * upstream
    * @param batchRowWidth the average width in bytes (including overhead) of
    * rows in the current input batch
    * @param batchRowCount the number of actual (not filtered) records in
    * that upstream batch
+   * @return true if the estimates changed, false if the previous estimates
+   * remain valid
    */
 
-  public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+  public boolean updateEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) {
 
     // The record count should never be zero, but better safe than sorry...
 
     if (batchRowCount == 0) {
-      return; }
+      return false; }
 
 
     // Update input batch estimates.
     // Go no further if nothing changed.
 
-    if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
-      return;
+    if (! updateInputEstimates(batchDataSize, batchRowWidth, batchRowCount)) {
+      return false;
     }
 
     updateSpillSettings();
     updateMergeSettings();
     adjustForLowMemory();
     logSettings(batchRowCount);
+    return true;
   }
 
-  private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+  private boolean updateInputEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) {
 
     // The row width may end up as zero if all fields are nulls or some
     // other unusual situation. In this case, assume a width of 10 just
@@ -192,17 +365,13 @@ public class SortMemoryManager {
     // batch. Because we are using the actual observed batch size,
     // the size already includes overhead due to power-of-two rounding.
 
-    long origInputBatchSize = estimatedInputBatchSize;
-    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
-
-    // Estimate the total size of each incoming batch plus sv2. Note that, due
-    // to power-of-two rounding, the allocated sv2 size might be twice the data size.
-
-    estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
+    long origInputBatchSize = inputBatchSize.dataSize;
+    inputBatchSize.setFromData(Math.max(inputBatchSize.dataSize, batchDataSize));
 
     // Return whether anything changed.
 
-    return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize;
+    return estimatedRowWidth > origRowEstimate ||
+           inputBatchSize.dataSize > origInputBatchSize;
   }
 
   /**
@@ -215,18 +384,23 @@ public class SortMemoryManager {
 
     spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
 
+    // But, don't allow spill batches to be too small; we pay too
+    // much overhead cost for small row counts.
+
+    spillBatchRowCount = Math.max(spillBatchRowCount, MIN_ROWS_PER_SORT_BATCH);
+
     // Compute the actual spill batch size which may be larger or smaller
-    // than the preferred size depending on the row width. Double the estimated
-    // memory needs to allow for power-of-two rounding.
+    // than the preferred size depending on the row width.
 
-    expectedSpillBatchSize = batchForRows(spillBatchRowCount);
+    spillBatchSize.setFromData(spillBatchRowCount * estimatedRowWidth);
 
     // Determine the minimum memory needed for spilling. Spilling is done just
     // before accepting a spill batch, so we must spill if we don't have room for a
     // (worst case) input batch. To spill, we need room for the spill batch created
-    // by merging the batches already in memory.
+    // by merging the batches already in memory. This is a memory calculation,
+    // so use the buffer size for the spill batch.
 
-    bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+    bufferMemoryLimit = memoryLimit - 2 * spillBatchSize.maxBufferSize;
   }
 
   /**
@@ -238,13 +412,21 @@ public class SortMemoryManager {
   private void updateMergeSettings() {
 
     mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
-    expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
+
+    // But, don't allow merge batches to be too small; we pay too
+    // much overhead cost for small row counts.
+
+    mergeBatchRowCount = Math.max(mergeBatchRowCount, MIN_ROWS_PER_SORT_BATCH);
+
+    // Compute the actual merge batch size.
+
+    mergeBatchSize.setFromData(mergeBatchRowCount * estimatedRowWidth);
 
     // The merge memory pool assumes we can spill all input batches. The memory
     // available to hold spill batches for merging is total memory minus the
     // expected output batch size.
 
-    mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+    mergeMemoryLimit = memoryLimit - mergeBatchSize.maxBufferSize;
   }
 
   /**
@@ -271,22 +453,27 @@ public class SortMemoryManager {
 
   private void adjustForLowMemory() {
 
-    long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
-    long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
-    if (loadHeadroom >= 0  &&  mergeHeadroom >= 0) {
-      return;
-    }
+    potentialOverflow = false;
+    performanceWarning = false;
 
-    lowMemorySpillBatchSize();
-    lowMemoryMergeBatchSize();
+    // Input batches are assumed to have typical fragmentation. Experience
+    // shows that spilled batches have close to the maximum fragmentation.
+
+    long loadHeadroom = bufferMemoryLimit - 2 * inputBatchSize.expectedBufferSize;
+    long mergeHeadroom = mergeMemoryLimit - 2 * spillBatchSize.maxBufferSize;
+    isLowMemory = (loadHeadroom < 0  |  mergeHeadroom < 0);
+    if (! isLowMemory) {
+      return; }
+
+    lowMemoryInternalBatchSizes();
 
     // Sanity check: if we've been given too little memory to make progress,
     // issue a warning but proceed anyway. Should only occur if something is
     // configured terribly wrong.
 
-    long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
+    long minNeeds = 2 * inputBatchSize.expectedBufferSize + spillBatchSize.maxBufferSize;
     if (minNeeds > memoryLimit) {
-      ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " +
+      logger.warn("Potential memory overflow during load phase! " +
           "Minimum needed = {} bytes, actual available = {} bytes",
           minNeeds, memoryLimit);
       bufferMemoryLimit = 0;
@@ -295,14 +482,36 @@ public class SortMemoryManager {
 
     // Sanity check
 
-    minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
+    minNeeds = 2 * spillBatchSize.expectedBufferSize + mergeBatchSize.expectedBufferSize;
     if (minNeeds > memoryLimit) {
-      ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " +
+      logger.warn("Potential memory overflow during merge phase! " +
           "Minimum needed = {} bytes, actual available = {} bytes",
           minNeeds, memoryLimit);
       mergeMemoryLimit = 0;
       potentialOverflow = true;
     }
+
+    // Performance warning
+
+    if (potentialOverflow) {
+      return;
+    }
+    if (spillBatchSize.dataSize < config.spillBatchSize()  &&
+        spillBatchRowCount < Character.MAX_VALUE) {
+      logger.warn("Potential performance degredation due to low memory. " +
+                  "Preferred spill batch size: {}, actual: {}, rows per batch: {}",
+                  config.spillBatchSize(), spillBatchSize.dataSize,
+                  spillBatchRowCount);
+      performanceWarning = true;
+    }
+    if (mergeBatchSize.dataSize < config.mergeBatchSize()  &&
+        mergeBatchRowCount < Character.MAX_VALUE) {
+      logger.warn("Potential performance degredation due to low memory. " +
+                  "Preferred merge batch size: {}, actual: {}, rows per batch: {}",
+                  config.mergeBatchSize(), mergeBatchSize.dataSize,
+                  mergeBatchRowCount);
+      performanceWarning = true;
+    }
   }
 
   /**
@@ -312,52 +521,66 @@ public class SortMemoryManager {
    * one spill batch to make progress.
    */
 
-  private void lowMemorySpillBatchSize() {
+  private void lowMemoryInternalBatchSizes() {
 
     // The "expected" size is with power-of-two rounding in some vectors.
     // We later work backwards to the row count assuming average internal
     // fragmentation.
 
-    // Must hold two input batches. Use (most of) the rest for the spill batch.
+    // Must hold two input batches. Use half of the rest for the spill batch.
+    // In a really bad case, the number here may be negative. We'll fix
+    // it below.
 
-    expectedSpillBatchSize = (int) (memoryLimit - 2 * estimatedInputSize);
+    int spillBufferSize = (int) (memoryLimit - 2 * inputBatchSize.maxBufferSize) / 2;
 
     // But, in the merge phase, we need two spill batches and one output batch.
     // (Assume that the spill and merge are equal sizes.)
-    // Use 3/4 of memory for each batch (to allow power-of-two rounding:
 
-    expectedSpillBatchSize = (int) Math.min(expectedSpillBatchSize, memoryLimit/3);
+    spillBufferSize = (int) Math.min(spillBufferSize, memoryLimit/4);
 
-    // Never going to happen, but let's ensure we don't somehow create large batches.
+    // Compute the size from the buffer. Assume worst-case
+    // fragmentation (as is typical when reading from the spill file.)
 
-    expectedSpillBatchSize = Math.max(expectedSpillBatchSize, SortConfig.MIN_SPILL_BATCH_SIZE);
+    spillBatchSize.setFromWorstCaseBuffer(spillBufferSize);
 
     // Must hold at least one row to spill. That is, we can make progress if we
     // create spill files that consist of single-record batches.
 
-    expectedSpillBatchSize = Math.max(expectedSpillBatchSize, estimatedRowWidth);
+    int spillDataSize = Math.min(spillBatchSize.dataSize, config.spillBatchSize());
+    spillDataSize = Math.max(spillDataSize, estimatedRowWidth);
+    if (spillDataSize != spillBatchSize.dataSize) {
+      spillBatchSize.setFromData(spillDataSize);
+    }
 
     // Work out the spill batch count needed by the spill code. Allow room for
     // power-of-two rounding.
 
-    spillBatchRowCount = rowsPerBatch(expectedSpillBatchSize);
+    spillBatchRowCount = rowsPerBatch(spillBatchSize.dataSize);
 
     // Finally, figure out when we must spill.
 
-    bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
-  }
+    bufferMemoryLimit = memoryLimit - 2 * spillBatchSize.maxBufferSize;
+    bufferMemoryLimit = Math.max(bufferMemoryLimit, 0);
 
-  /**
-   * For merge batch, we must hold at least two spill batches and
-   * one output batch.
-   */
+    // Assume two spill batches must be merged (plus safety margin.)
+    // The rest can be give to the merge batch.
+
+    long mergeBufferSize = memoryLimit - 2 * spillBatchSize.maxBufferSize;
+
+    // The above calcs assume that the merge batch size is the same as
+    // the spill batch size (the division by three.)
+    // For merge batch, we must hold at least two spill batches and
+    // one output batch, which is why we assumed 3 spill batches.
 
-  private void lowMemoryMergeBatchSize() {
-    expectedMergeBatchSize = (int) (memoryLimit - 2 * expectedSpillBatchSize);
-    expectedMergeBatchSize = Math.max(expectedMergeBatchSize, SortConfig.MIN_MERGE_BATCH_SIZE);
-    expectedMergeBatchSize = Math.max(expectedMergeBatchSize, estimatedRowWidth);
-    mergeBatchRowCount = rowsPerBatch(expectedMergeBatchSize);
-    mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+    mergeBatchSize.setFromBuffer((int) mergeBufferSize);
+    int mergeDataSize = Math.min(mergeBatchSize.dataSize, config.mergeBatchSize());
+    mergeDataSize = Math.max(mergeDataSize, estimatedRowWidth);
+    if (mergeDataSize != mergeBatchSize.dataSize) {
+      mergeBatchSize.setFromData(spillDataSize);
+    }
+
+    mergeBatchRowCount = rowsPerBatch(mergeBatchSize.dataSize);
+    mergeMemoryLimit = Math.max(2 * spillBatchSize.expectedBufferSize, memoryLimit - mergeBatchSize.maxBufferSize);
   }
 
   /**
@@ -367,14 +590,34 @@ public class SortMemoryManager {
 
   private void logSettings(int actualRecordCount) {
 
-    ExternalSortBatch.logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
-                 estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
-    ExternalSortBatch.logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
-                 expectedSpillBatchSize, spillBatchRowCount, config.spillFileSize());
-    ExternalSortBatch.logger.debug("Output batch size = {} bytes, {} records",
-                 expectedMergeBatchSize, mergeBatchRowCount);
-    ExternalSortBatch.logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
+    logger.debug("Input Batch Estimates: record size = {} bytes; net = {} bytes, gross = {}, records = {}",
+                 estimatedRowWidth, inputBatchSize.dataSize,
+                 inputBatchSize.expectedBufferSize, actualRecordCount);
+    logger.debug("Spill batch size: net = {} bytes, gross = {} bytes, records = {}; spill file = {} bytes",
+                 spillBatchSize.dataSize, spillBatchSize.expectedBufferSize,
+                 spillBatchRowCount, config.spillFileSize());
+    logger.debug("Output batch size: net = {} bytes, gross = {} bytes, records = {}",
+                 mergeBatchSize.dataSize, mergeBatchSize.expectedBufferSize,
+                 mergeBatchRowCount);
+    logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
                  memoryLimit, bufferMemoryLimit, mergeMemoryLimit);
+
+    // Performance warnings due to low row counts per batch.
+    // Low row counts cause excessive per-batch overhead and hurt
+    // performance.
+
+    if (spillBatchRowCount < MIN_ROWS_PER_SORT_BATCH) {
+      logger.warn("Potential performance degredation due to low memory or large input row. " +
+                  "Preferred spill batch row count: {}, actual: {}",
+                  MIN_ROWS_PER_SORT_BATCH, spillBatchRowCount);
+      performanceWarning = true;
+    }
+    if (mergeBatchRowCount < MIN_ROWS_PER_SORT_BATCH) {
+      logger.warn("Potential performance degredation due to low memory or large input row. " +
+                  "Preferred merge batch row count: {}, actual: {}",
+                  MIN_ROWS_PER_SORT_BATCH, mergeBatchRowCount);
+      performanceWarning = true;
+    }
   }
 
   public enum MergeAction { SPILL, MERGE, NONE }
@@ -389,86 +632,120 @@ public class SortMemoryManager {
     }
   }
 
+  /**
+   * Choose a consolidation option during the merge phase depending on memory
+   * available. Preference is given to moving directly onto merging (with no
+   * additional spilling) when possible. But, if memory pressures don't allow
+   * this, we must spill batches and/or merge on-disk spilled runs, to reduce
+   * the final set of runs to something that can be merged in the available
+   * memory.
+   * <p>
+   * Logic is here (returning an enum) rather than in the merge code to allow
+   * unit testing without actually needing batches in memory.
+   *
+   * @param allocMemory
+   *          amount of memory currently allocated (this class knows the total
+   *          memory available)
+   * @param inMemCount
+   *          number of incoming batches in memory (the number is important, not
+   *          the in-memory size; we get the memory size from
+   *          <tt>allocMemory</tt>)
+   * @param spilledRunsCount
+   *          the number of runs sitting on disk to be merged
+   * @return whether to <tt>SPILL</tt> in-memory batches, whether to
+   *         <tt>MERGE<tt> on-disk batches to create a new, larger run, or whether
+   *         to do nothing (<tt>NONE</tt>) and instead advance to the final merge
+   */
+
   public MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount) {
 
-    // Determine additional memory needed to hold one batch from each
-    // spilled run.
+    assert allocMemory == 0 || inMemCount > 0;
+    assert inMemCount + spilledRunsCount > 0;
 
-    // If the on-disk batches and in-memory batches need more memory than
-    // is available, spill some in-memory batches.
+    // If only one spilled run, then merging is not productive regardless
+    // of memory limits.
 
-    if (inMemCount > 0) {
-      long mergeSize = spilledRunsCount * expectedSpillBatchSize;
-      if (allocMemory + mergeSize > mergeMemoryLimit) {
-        return new MergeTask(MergeAction.SPILL, 0);
-      }
+    if (inMemCount == 0 && spilledRunsCount <= 1) {
+      return new MergeTask(MergeAction.NONE, 0);
     }
 
-    // Maximum batches that fit into available memory.
+    // If memory is above the merge memory limit, then must spill
+    // merge to create room for a merge batch.
 
-    int mergeLimit = (int) ((mergeMemoryLimit - allocMemory) / expectedSpillBatchSize);
+    if (allocMemory > mergeMemoryLimit) {
+      return new MergeTask(MergeAction.SPILL, 0);
+    }
 
-    // Can't merge more than the merge limit.
+    // Determine additional memory needed to hold one batch from each
+    // spilled run.
+
+    // Maximum spill batches that fit into available memory.
 
-    mergeLimit = Math.min(mergeLimit, config.mergeLimit());
+    int memMergeLimit = (int) ((mergeMemoryLimit - allocMemory) /
+                                spillBatchSize.expectedBufferSize);
+    memMergeLimit = Math.max(0, memMergeLimit);
 
-    // How many batches to merge?
+    // If batches are in memory, and we need more memory to merge
+    // them all than is actually available, then spill some in-memory
+    // batches.
+
+    if (inMemCount > 0  &&  memMergeLimit < spilledRunsCount) {
+      return new MergeTask(MergeAction.SPILL, 0);
+    }
 
-    int mergeCount = spilledRunsCount - mergeLimit;
-    if (mergeCount <= 0) {
+    // If all batches fit in memory, then no need for a second-generation
+    // merge/spill.
+
+    memMergeLimit = Math.min(memMergeLimit, config.mergeLimit());
+    int mergeRunCount = spilledRunsCount - memMergeLimit;
+    if (mergeRunCount <= 0) {
       return new MergeTask(MergeAction.NONE, 0);
     }
 
-    // We will merge. This will create yet another spilled
-    // run. Account for that.
+    // We need a second generation load-merge-spill cycle
+    // to reduce the number of spilled runs to a smaller set
+    // that will fit in memory.
+
+    // Merging creates another batch. Include one more run
+    // in the merge to create space for the new run.
+
+    mergeRunCount += 1;
 
-    mergeCount += 1;
+    // Merge only as many batches as fit in memory.
+    // Use all memory for this process; no need to reserve space for a
+    // merge output batch. Assume worst case since we are forced to
+    // accept spilled batches blind: we can't limit reading based on memory
+    // limits. Subtract one to allow for the output spill batch.
+
+    memMergeLimit = (int)(memoryLimit / spillBatchSize.maxBufferSize) - 1;
+    mergeRunCount = Math.min(mergeRunCount, memMergeLimit);
 
     // Must merge at least 2 batches to make progress.
-    // This is the the (at least one) excess plus the allowance
-    // above for the new one.
+    // We know we have at least two because of the check done above.
 
-    // Can't merge more than the limit.
+    mergeRunCount = Math.max(mergeRunCount, 2);
 
-    mergeCount = Math.min(mergeCount, config.mergeLimit());
+    // Can't merge more than the merge limit.
 
-    // Do the merge, then loop to try again in case not
-    // all the target batches spilled in one go.
+    mergeRunCount = Math.min(mergeRunCount, config.mergeLimit());
 
-    return new MergeTask(MergeAction.MERGE, mergeCount);
+    return new MergeTask(MergeAction.MERGE, mergeRunCount);
   }
 
   /**
-   * Compute the number of rows per batch assuming that the batch is
-   * subject to average internal fragmentation due to power-of-two
-   * rounding on vectors.
-   * <p>
-   * <pre>[____|__$__]</pre>
-   * In the above, the brackets represent the whole vector. The
-   * first half is always full. When the first half filled, the second
-   * half was allocated. On average, the second half will be half full.
+   * Compute the number of rows that fit into a given batch data size.
    *
    * @param batchSize expected batch size, including internal fragmentation
    * @return number of rows that fit into the batch
    */
 
   private int rowsPerBatch(int batchSize) {
-    int rowCount = batchSize * 3 / 4 / estimatedRowWidth;
+    int rowCount = batchSize / estimatedRowWidth;
     return Math.max(1, Math.min(rowCount, Character.MAX_VALUE));
   }
 
-  /**
-   * Compute the expected number of rows that fit into a given size
-   * batch, accounting for internal fragmentation due to power-of-two
-   * rounding on vector allocations.
-   *
-   * @param rowCount the desired number of rows in the batch
-   * @return the size of resulting batch, including power-of-two
-   * rounding.
-   */
-
-  private int batchForRows(int rowCount) {
-    return estimatedRowWidth * rowCount * 4 / 3;
+  public static int multiply(int byteSize, double multiplier) {
+    return (int) Math.floor(byteSize * multiplier);
   }
 
   // Must spill if we are below the spill point (the amount of memory
@@ -497,17 +774,21 @@ public class SortMemoryManager {
   @VisibleForTesting
   public int getRowWidth() { return estimatedRowWidth; }
   @VisibleForTesting
-  public int getInputBatchSize() { return estimatedInputBatchSize; }
+  public BatchSizeEstimate getInputBatchSize() { return inputBatchSize; }
   @VisibleForTesting
   public int getPreferredSpillBatchSize() { return preferredSpillBatchSize; }
   @VisibleForTesting
   public int getPreferredMergeBatchSize() { return preferredMergeBatchSize; }
   @VisibleForTesting
-  public int getSpillBatchSize() { return expectedSpillBatchSize; }
+  public BatchSizeEstimate getSpillBatchSize() { return spillBatchSize; }
   @VisibleForTesting
-  public int getMergeBatchSize() { return expectedMergeBatchSize; }
+  public BatchSizeEstimate getMergeBatchSize() { return mergeBatchSize; }
   @VisibleForTesting
   public long getBufferMemoryLimit() { return bufferMemoryLimit; }
   @VisibleForTesting
   public boolean mayOverflow() { return potentialOverflow; }
+  @VisibleForTesting
+  public boolean isLowMemory() { return isLowMemory; }
+  @VisibleForTesting
+  public boolean hasPerformanceWarning() { return performanceWarning; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
index 4231cf4..0f27884 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
@@ -83,10 +83,9 @@ public class SorterWrapper extends BaseSortWrapper {
     ClassGenerator<SingleBatchSorter> g = cg.getRoot();
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-  cg.saveCodeForDebugging(true);
+//    cg.saveCodeForDebugging(true);
 
     generateComparisons(g, batch, logger);
     return getInstance(cg, logger);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index a6042c6..b75ce77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorContainer;
 
 import com.google.common.collect.Lists;
@@ -86,13 +87,14 @@ public class SpilledRuns {
     return batchesToSpill;
   }
 
-  public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount) {
-    spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount));
+  public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) {
+    spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper));
     logger.trace("Completed spill: memory = {}",
         context.getAllocator().getAllocatedMemory());
   }
 
-  public void mergeRuns(int targetCount, long mergeMemoryPool, int spillBatchRowCount) {
+  public void mergeRuns(int targetCount, long mergeMemoryPool,
+                  int spillBatchRowCount, VectorInitializer allocHelper) {
 
     long allocated = context.getAllocator().getAllocatedMemory();
     mergeMemoryPool -= context.getAllocator().getAllocatedMemory();
@@ -128,12 +130,12 @@ public class SpilledRuns {
     // Do the actual spill.
 
     List<BatchGroup> batchesToSpill = prepareSpillBatches(spilledRuns, mergeCount);
-    mergeAndSpill(batchesToSpill, spillBatchRowCount);
+    mergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper);
   }
 
-  private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) {
+  private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) {
     try {
-      return doMergeAndSpill(batchesToSpill, spillBatchRowCount);
+      return doMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper);
     }
     // If error is a User Exception, just use as is.
 
@@ -145,7 +147,8 @@ public class SpilledRuns {
     }
   }
 
-  private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) throws Throwable {
+  private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill,
+                        int spillBatchRowCount, VectorInitializer allocHelper) throws Throwable {
 
     // Merge the selected set of matches and write them to the
     // spill file. After each write, we release the memory associated
@@ -155,7 +158,8 @@ public class SpilledRuns {
     BatchGroup.SpilledRun newGroup = null;
     VectorContainer dest = new VectorContainer();
     try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
-         PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, dest, spillBatchRowCount)) {
+         PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill,
+                                         dest, spillBatchRowCount, allocHelper)) {
       newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, context.getAllocator());
       logger.trace("Spilling {} batches, into spill batches of {} rows, to {}",
           batchesToSpill.size(), spillBatchRowCount, outputFile);
@@ -175,9 +179,9 @@ public class SpilledRuns {
       }
       context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class);
       newGroup.closeOutputStream();
-      logger.trace("Spilled {} output batches, each of {} by bytes, {} records to {}",
-                   merger.getBatchCount(), merger.getRecordCount(),
-                   merger.getEstBatchSize(), outputFile);
+      logger.trace("Spilled {} output batches, each of {} bytes, {} records, to {}",
+                   merger.getBatchCount(), merger.getEstBatchSize(),
+                   spillBatchRowCount, outputFile);
       newGroup.setBatchSize(merger.getEstBatchSize());
       return newGroup;
     } catch (Throwable e) {
@@ -192,7 +196,8 @@ public class SpilledRuns {
     }
   }
 
-  public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, VectorContainer container, int mergeRowCount) {
+  public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches,
+                    VectorContainer container, int mergeRowCount, VectorInitializer allocHelper) {
     List<BatchGroup> allBatches = new LinkedList<>();
     allBatches.addAll(bufferedBatches);
     bufferedBatches.clear();
@@ -200,7 +205,7 @@ public class SpilledRuns {
     spilledRuns.clear();
     logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
         allBatches.size(), context.getAllocator().getAllocatedMemory());
-    return copierHolder.startMerge(schema, allBatches, container, mergeRowCount);
+    return copierHolder.startMerge(schema, allBatches, container, mergeRowCount, allocHelper);
   }
 
   public void close() {

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index ca275c7..eb90614 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -117,17 +117,19 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
         return IterOutcome.STOP;
       }
       next = b.next();
-    }finally{
+    } finally {
       stats.startProcessing();
     }
 
-    switch(next){
+    switch(next) {
     case OK_NEW_SCHEMA:
       stats.batchReceived(inputIndex, b.getRecordCount(), true);
       break;
     case OK:
       stats.batchReceived(inputIndex, b.getRecordCount(), false);
       break;
+    default:
+      break;
     }
 
     return next;

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 44c6b1a..4e47051 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -81,6 +81,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
     }
   }
 
+  @SuppressWarnings("resource")
   @Override
   public VectorWrapper<?> getChildWrapper(int[] ids) {
     if (ids.length == 1) {
@@ -105,6 +106,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
     return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors);
   }
 
+  @SuppressWarnings("resource")
   @Override
   public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
     ValueVector v = vectors[0];
@@ -112,7 +114,6 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) {
     return new HyperVectorWrapper<T>(f, vectors, false);
 //    T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length);
@@ -128,12 +129,14 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
     return new HyperVectorWrapper<T>(f, v, releasable);
   }
 
+  @SuppressWarnings("unchecked")
   public void addVector(ValueVector v) {
     Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s for field %s",
       v.getClass(), this.getVectorClass(), v.getField()));
     vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time
   }
 
+  @SuppressWarnings("unchecked")
   public void addVectors(ValueVector[] vv) {
     vectors = (T[]) ArrayUtils.add(vectors, vv);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 0daa6b3..b4ae2d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,8 +19,6 @@ package org.apache.drill.exec.record;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
 
 /**
  * A record batch contains a set of field values for a particular range of

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
new file mode 100644
index 0000000..5dd348e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+/**
+ * Prototype mechanism to allocate vectors based on expected
+ * data sizes. This version uses a name-based map of fields
+ * to sizes. Better to represent the batch structurally and
+ * simply iterate over the schema rather than doing a per-field
+ * lookup. But, the mechanisms needed to do the efficient solution
+ * don't exist yet.
+ */
+
+public class VectorInitializer {
+
+  private static class AllocationHint {
+    public final int entryWidth;
+    public final int elementCount;
+
+    private AllocationHint(int width, int elements) {
+      entryWidth = width;
+      elementCount = elements;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder()
+          .append("{");
+      String sep = "";
+      if (entryWidth > 0) {
+        buf.append("width=")
+           .append(entryWidth);
+        sep = ", ";
+      }
+      if (elementCount > 0) {
+        buf.append(sep)
+           .append("elements=")
+           .append(elementCount);
+      }
+      buf.append("}");
+      return buf.toString();
+    }
+  }
+
+  private Map<String, AllocationHint> hints = new HashMap<>();
+
+  public void variableWidth(String name, int width) {
+    hints.put(name, new AllocationHint(width, 1));
+  }
+
+  public void fixedWidthArray(String name, int elements) {
+    hints.put(name, new AllocationHint(0, elements));
+  }
+
+  public void variableWidthArray(String name, int width, int elements) {
+    hints.put(name, new AllocationHint(width, elements));
+  }
+
+  public void allocateBatch(VectorAccessible va, int recordCount) {
+    for (VectorWrapper<?> w: va) {
+      allocateVector(w.getValueVector(), "", recordCount);
+    }
+  }
+
+  private void allocateVector(ValueVector vector, String prefix, int recordCount) {
+    String key = prefix + vector.getField().getName();
+    AllocationHint hint = hints.get(key);
+    if (vector instanceof AbstractMapVector) {
+      allocateMap((AbstractMapVector) vector, prefix, recordCount, hint);
+    } else {
+      allocateVector(vector, recordCount, hint);
+    }
+//    Set<BufferLedger> ledgers = new HashSet<>();
+//    vector.getLedgers(ledgers);
+//    int size = 0;
+//    for (BufferLedger ledger : ledgers) {
+//      size += ledger.getAccountedSize();
+//    }
+//    System.out.println(key + ": " + vector.getField().toString() +
+//        " " +
+//        ((hint == null) ? "no hint" : hint.toString()) +
+//        ", " + size);
+  }
+
+  private void allocateVector(ValueVector vector, int recordCount, AllocationHint hint) {
+    if (hint == null) {
+      // Use hard-coded values. Same as ScanBatch
+
+      AllocationHelper.allocate(vector, recordCount, 50, 10);
+    } else {
+      AllocationHelper.allocate(vector, recordCount, hint.entryWidth, hint.elementCount);
+    }
+  }
+
+  private void allocateMap(AbstractMapVector map, String prefix, int recordCount, AllocationHint hint) {
+    if (map instanceof RepeatedMapVector) {
+      ((RepeatedMapVector) map).allocateOffsetsNew(recordCount);
+      if (hint == null) {
+        recordCount *= 10;
+      } else {
+        recordCount *= hint.elementCount;
+      }
+    }
+    prefix += map.getField().getName() + ".";
+    for (ValueVector vector : map) {
+      allocateVector(vector, prefix, recordCount);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("[" + getClass().getSimpleName())
+       .append(" ");
+    boolean first = true;
+    for (Entry<String, AllocationHint>entry : hints.entrySet()) {
+      if (! first) {
+        buf.append(", ");
+      }
+      first = false;
+      buf.append("[")
+         .append(entry.getKey())
+         .append(" ")
+         .append(entry.getValue().toString())
+         .append("]");
+    }
+    buf.append("]");
+    return buf.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index bcec920..b3b46c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -80,6 +80,7 @@ public class WritableBatch implements AutoCloseable {
         len += b.capacity();
       }
 
+      @SuppressWarnings("resource")
       DrillBuf newBuf = allocator.buffer(len);
       try {
         /* Copy data from each buffer into the compound buffer */
@@ -101,7 +102,9 @@ public class WritableBatch implements AutoCloseable {
 
         for (VectorWrapper<?> vv : container) {
           SerializedField fmd = fields.get(vectorIndex);
+          @SuppressWarnings("resource")
           ValueVector v = vv.getValueVector();
+          @SuppressWarnings("resource")
           DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
 //        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
           v.load(fmd, bb);

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 0d341df..7ed9220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -293,6 +293,7 @@ public class Drillbit implements AutoCloseable {
     return start(config, null);
   }
 
+  @SuppressWarnings("resource")
   public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
       throws DrillbitStartupException {
     logger.debug("Starting new Drillbit.");

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 437862e..41ecc95 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -251,8 +251,6 @@ drill.exec: {
     external: {
       // Drill uses the managed External Sort Batch by default.
       // Set this to true to use the legacy, unmanaged version.
-      // Disabled in the intial commit, to be enabled after
-      // tests are committed.
       disable_managed: true,
       // Limit on the number of batches buffered in memory.
       // Primarily for testing.
@@ -282,9 +280,9 @@ drill.exec: {
         directories:  ${drill.exec.spill.directories},
         // Size of the batches written to, and read from, the spill files.
         // Determines the ratio of memory to input data size for a single-
-        // generation sort. Smaller values give larger ratios, but at a
-        // (high) cost of much greater disk seek times.
-        spill_batch_size = 8M,
+        // generation sort. Smaller values are better, but too small
+        // incurs per-batch overhead.
+        spill_batch_size = 1M,
         // Preferred file size for "first-generation" spill files.
         // Set large enough to get long, continuous writes, but not so
         // large as to overwhelm a temp directory.
@@ -292,7 +290,8 @@ drill.exec: {
         file_size: 256M,
         // Size of the batch sent downstream from the sort operator during
         // the merge phase. Don't change this unless you know what you are doing,
-        // larger sizes can result in memory fragmentation.
+        // larger sizes can result in memory fragmentation, smaller sizes
+        // in excessive operator iterator overhead.
         merge_batch_size = 16M
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 7700a1e..ee350ce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -31,7 +31,6 @@ import org.junit.Test;
 import java.util.List;
 
 public class TestUnionAll extends BaseTestQuery{
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
 
   private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
   private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index 05670c5..cfb8645 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -17,9 +17,6 @@
  */
 package org.apache.drill.exec.cache;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -119,8 +116,6 @@ public class TestBatchSerialization extends DrillTest {
    */
   private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
 
-    long origSize = rowSet.size();
-
     File dir = OperatorFixture.getTempDir("serial");
     File outFile = new File(dir, "serialze.dat");
     try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
@@ -135,7 +130,6 @@ public class TestBatchSerialization extends DrillTest {
           .read());
     }
 
-    assertTrue(origSize >= result.size());
     new RowSetComparison(expected)
       .verifyAndClearAll(result);
     outFile.delete();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 76f0935..73f9b6d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -440,6 +440,11 @@ public class TestWindowFrame extends BaseTestQuery {
       .go();
   }
 
+  // Note: This test is unstable. It works when forcing the merge/sort batch
+  // size to 20, but not for other sizes. The problem is either that the results
+  // are not ordered (and so subject to sort instability), or there is some bug
+  // somewhere in the window functions.
+
   @Test
   public void test4657() throws Exception {
     testBuilder()

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index f643d5f..7e63600 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -35,14 +35,17 @@ import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.FixtureBuilder;
+import org.apache.drill.test.SecondaryTest;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
+@Category(SecondaryTest.class)
 public class TestSimpleExternalSort extends DrillTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80_000);
 
   @Test
   public void mergeSortWithSv2Managed() throws Exception {
@@ -100,7 +103,7 @@ public class TestSimpleExternalSort extends DrillTest {
          ClientFixture client = cluster.clientFixture()) {
       chooseImpl(client, testLegacy);
       List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results();
-      assertEquals(1000000, client.countResults(results));
+      assertEquals(1_000_000, client.countResults(results));
       validateResults(client.allocator(), results);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
index 5a1bf6d..bbb48af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -64,8 +64,8 @@ public class TestSortSpillWithException extends ClusterTest {
     // inject exception in sort while spilling
     final String controls = Controls.newBuilder()
       .addExceptionOnBit(
-          org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class,
-          org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+          ExternalSortBatch.class,
+          ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
           IOException.class,
           cluster.drillbit().getContext().getEndpoint())
       .build();
@@ -87,8 +87,8 @@ public class TestSortSpillWithException extends ClusterTest {
     // inject exception in sort while spilling
     final String controls = Controls.newBuilder()
       .addExceptionOnBit(
-          org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.class,
-          org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+          ExternalSortBatch.class,
+          ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
           IOException.class,
           cluster.drillbit().getContext().getEndpoint())
       .build();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
index 1a4d4b2..8ba34ef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -104,7 +104,7 @@ public class SortTestUtilities {
       VectorContainer dest = new VectorContainer();
       @SuppressWarnings("resource")
       BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE),
-                                             batches, dest, rowCount);
+                                             batches, dest, rowCount, null);
 
       verifyResults(merger, dest);
       dest.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
index 0050747..6464b5a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
@@ -69,8 +69,13 @@ public class TestCopier extends DrillTest {
     PriorityQueueCopierWrapper copier = SortTestUtilities.makeCopier(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
     VectorContainer dest = new VectorContainer();
     try {
+      // TODO: Create a vector allocator to pass as last parameter so
+      // that the test uses the same vector allocator as the production
+      // code. Only nuisance is that we don't have the required metadata
+      // readily at hand here...
+
       @SuppressWarnings({ "resource", "unused" })
-      BatchMerger merger = copier.startMerge(schema, batches, dest, 10);
+      BatchMerger merger = copier.startMerge(schema, batches, dest, 10, null);
       fail();
     } catch (AssertionError e) {
       // Expected

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index 6bff088..69e9e1b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -47,12 +47,14 @@ public class TestExternalSortInternals extends DrillTest {
     assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit());
     // Default size: 256 MiB
     assertEquals(256 * ONE_MEG, sortConfig.spillFileSize());
-    // Default size: 8 MiB
-    assertEquals(8 * ONE_MEG, sortConfig.spillBatchSize());
+    // Default size: 1 MiB
+    assertEquals(ONE_MEG, sortConfig.spillBatchSize());
     // Default size: 16 MiB
     assertEquals(16 * ONE_MEG, sortConfig.mergeBatchSize());
     // Default: unlimited
     assertEquals(Integer.MAX_VALUE, sortConfig.getBufferedBatchLimit());
+    // Default: 64K
+    assertEquals(Character.MAX_VALUE, sortConfig.getMSortBatchSize());
   }
 
   /**
@@ -69,6 +71,7 @@ public class TestExternalSortInternals extends DrillTest {
         .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000)
         .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000)
         .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50)
+        .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 10)
         .build();
     SortConfig sortConfig = new SortConfig(drillConfig);
     assertEquals(2000 * 1024, sortConfig.maxMemory());
@@ -77,6 +80,7 @@ public class TestExternalSortInternals extends DrillTest {
     assertEquals(500_000, sortConfig.spillBatchSize());
     assertEquals(600_000, sortConfig.mergeBatchSize());
     assertEquals(50, sortConfig.getBufferedBatchLimit());
+    assertEquals(10, sortConfig.getMSortBatchSize());
   }
 
   /**
@@ -90,6 +94,7 @@ public class TestExternalSortInternals extends DrillTest {
         .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE - 1)
         .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1)
         .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1)
+        .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 0)
         .build();
     SortConfig sortConfig = new SortConfig(drillConfig);
     assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit());
@@ -97,13 +102,14 @@ public class TestExternalSortInternals extends DrillTest {
     assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize());
     assertEquals(SortConfig.MIN_MERGE_BATCH_SIZE, sortConfig.mergeBatchSize());
     assertEquals(2, sortConfig.getBufferedBatchLimit());
+    assertEquals(1, sortConfig.getMSortBatchSize());
   }
 
   @Test
   public void testMemoryManagerBasics() {
     DrillConfig drillConfig = DrillConfig.create();
     SortConfig sortConfig = new SortConfig(drillConfig);
-    long memoryLimit = 50 * ONE_MEG;
+    long memoryLimit = 70 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
     // Basic setup
@@ -120,35 +126,35 @@ public class TestExternalSortInternals extends DrillTest {
     int rowCount = 10000;
     int batchSize = rowWidth * rowCount * 2;
 
-    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount));
     verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
 
     // Zero rows - no update
 
-    memManager.updateEstimates(batchSize, rowWidth, 0);
+    assertFalse(memManager.updateEstimates(batchSize, rowWidth, 0));
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
 
     // Larger batch size, update batch size
 
     rowCount = 20000;
     batchSize = rowWidth * rowCount * 2;
-    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount));
     verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
 
     // Smaller batch size: no change
 
     rowCount = 5000;
     int lowBatchSize = rowWidth * rowCount * 2;
-    memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+    assertFalse(memManager.updateEstimates(lowBatchSize, rowWidth, rowCount));
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
 
     // Different batch density, update batch size
 
     rowCount = 10000;
     batchSize = rowWidth * rowCount * 5;
-    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount));
     verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
 
     // Smaller row size, no update
@@ -156,23 +162,23 @@ public class TestExternalSortInternals extends DrillTest {
     int lowRowWidth = 200;
     rowCount = 10000;
     lowBatchSize = rowWidth * rowCount * 2;
-    memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount);
+    assertFalse(memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount));
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
 
     // Larger row size, updates calcs
 
     rowWidth = 400;
     rowCount = 10000;
     lowBatchSize = rowWidth * rowCount * 2;
-    memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+    assertTrue(memManager.updateEstimates(lowBatchSize, rowWidth, rowCount));
     verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
 
     // EOF: very low density
 
-    memManager.updateEstimates(lowBatchSize, rowWidth, 5);
+    assertFalse(memManager.updateEstimates(lowBatchSize, rowWidth, 5));
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
   }
 
   private void verifyCalcs(SortConfig sortConfig, long memoryLimit, SortMemoryManager memManager, int batchSize,
@@ -183,7 +189,7 @@ public class TestExternalSortInternals extends DrillTest {
     // Row and batch sizes should be exact
 
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
 
     // Spill sizes will be rounded, but within reason.
 
@@ -191,9 +197,9 @@ public class TestExternalSortInternals extends DrillTest {
     assertTrue(count >= memManager.getSpillBatchRowCount());
     assertTrue(count/2 <= memManager.getSpillBatchRowCount());
     int spillSize = memManager.getSpillBatchRowCount() * rowWidth;
-    assertTrue(spillSize <= memManager.getSpillBatchSize());
-    assertTrue(spillSize >= memManager.getSpillBatchSize()/2);
-    assertEquals(memoryLimit - memManager.getSpillBatchSize(), memManager.getBufferMemoryLimit());
+    assertTrue(spillSize <= memManager.getSpillBatchSize().dataSize);
+    assertTrue(spillSize >= memManager.getSpillBatchSize().dataSize/2);
+    assertTrue(memManager.getBufferMemoryLimit() <= memoryLimit - memManager.getSpillBatchSize().expectedBufferSize );
 
     // Merge sizes will also be rounded, within reason.
 
@@ -201,9 +207,9 @@ public class TestExternalSortInternals extends DrillTest {
     assertTrue(count >= memManager.getMergeBatchRowCount());
     assertTrue(count/2 <= memManager.getMergeBatchRowCount());
     int mergeSize = memManager.getMergeBatchRowCount() * rowWidth;
-    assertTrue(mergeSize <= memManager.getMergeBatchSize());
-    assertTrue(mergeSize >= memManager.getMergeBatchSize()/2);
-    assertEquals(memoryLimit - memManager.getMergeBatchSize(), memManager.getMergeMemoryLimit());
+    assertTrue(mergeSize <= memManager.getMergeBatchSize().dataSize);
+    assertTrue(mergeSize >= memManager.getMergeBatchSize().dataSize/2);
+    assertTrue(memManager.getMergeMemoryLimit() <= memoryLimit - memManager.getMergeBatchSize().expectedBufferSize);
   }
 
   @Test
@@ -220,7 +226,7 @@ public class TestExternalSortInternals extends DrillTest {
     int batchSize = rowCount * 2;
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
     assertEquals(10, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
 
     // Truncate spill, merge batch row count
 
@@ -234,12 +240,12 @@ public class TestExternalSortInternals extends DrillTest {
 
     // Small, but non-zero, row
 
-    rowWidth = 20;
+    rowWidth = 10;
     rowCount = 10000;
     batchSize = rowWidth * rowCount;
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
 
     // Truncate spill, merge batch row count
 
@@ -256,69 +262,89 @@ public class TestExternalSortInternals extends DrillTest {
   public void testLowMemory() {
     DrillConfig drillConfig = DrillConfig.create();
     SortConfig sortConfig = new SortConfig(drillConfig);
-    long memoryLimit = 10 * ONE_MEG;
+    int memoryLimit = 10 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
     // Tight squeeze, but can be made to work.
-    // Input batches are a quarter of memory.
+    // Input batch buffer size is a quarter of memory.
 
     int rowWidth = 1000;
-    int rowCount = (int) (memoryLimit / 4 / rowWidth);
-    int batchSize = rowCount * rowWidth;
+    int batchSize = SortMemoryManager.multiply(memoryLimit / 4, SortMemoryManager.PAYLOAD_FROM_BUFFER);
+    int rowCount = batchSize / rowWidth;
+    batchSize = rowCount * rowWidth;
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
     assertFalse(memManager.mayOverflow());
+    assertTrue(memManager.hasPerformanceWarning());
 
     // Spill, merge batches should be constrained
 
-    int spillBatchSize = memManager.getSpillBatchSize();
+    int spillBatchSize = memManager.getSpillBatchSize().dataSize;
     assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
     assertTrue(spillBatchSize >= rowWidth);
     assertTrue(spillBatchSize <= memoryLimit / 3);
     assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
     assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
 
-    int mergeBatchSize = memManager.getMergeBatchSize();
+    int mergeBatchSize = memManager.getMergeBatchSize().dataSize;
     assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
     assertTrue(mergeBatchSize >= rowWidth);
     assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
     assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
 
-    // Should spill after just two batches
+    // Should spill after just two or three batches
 
-    assertFalse(memManager.isSpillNeeded(0, batchSize));
-    assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
-    assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+    int inputBufferSize = memManager.getInputBatchSize().expectedBufferSize;
+    assertFalse(memManager.isSpillNeeded(0, inputBufferSize));
+    assertFalse(memManager.isSpillNeeded(batchSize, inputBufferSize));
+    assertTrue(memManager.isSpillNeeded(3 * inputBufferSize, inputBufferSize));
+  }
+
+  @Test
+  public void testLowerMemory() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    int memoryLimit = 10 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
     // Tighter squeeze, but can be made to work.
     // Input batches are 3/8 of memory; two fill 3/4,
     // but small spill and merge batches allow progress.
 
-    rowWidth = 1000;
-    rowCount = (int) (memoryLimit * 3 / 8 / rowWidth);
+    int rowWidth = 1000;
+    int batchSize = SortMemoryManager.multiply(memoryLimit * 3 / 8, SortMemoryManager.PAYLOAD_FROM_BUFFER);
+    int rowCount = batchSize / rowWidth;
     batchSize = rowCount * rowWidth;
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
     assertFalse(memManager.mayOverflow());
+    assertTrue(memManager.hasPerformanceWarning());
 
     // Spill, merge batches should be constrained
 
-    spillBatchSize = memManager.getSpillBatchSize();
+    int spillBatchSize = memManager.getSpillBatchSize().dataSize;
     assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
     assertTrue(spillBatchSize >= rowWidth);
     assertTrue(spillBatchSize <= memoryLimit / 3);
     assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
-    assertTrue(memManager.getSpillBatchRowCount() > 1);
+    assertTrue(memManager.getSpillBatchRowCount() >= 1);
     assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
 
-    mergeBatchSize = memManager.getMergeBatchSize();
+    int mergeBatchSize = memManager.getMergeBatchSize().dataSize;
     assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
     assertTrue(mergeBatchSize >= rowWidth);
     assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
     assertTrue(memManager.getMergeBatchRowCount() > 1);
     assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
+
+    // Should spill after just two batches
+
+    int inputBufferSize = memManager.getInputBatchSize().expectedBufferSize;
+    assertFalse(memManager.isSpillNeeded(0, inputBufferSize));
+    assertFalse(memManager.isSpillNeeded(batchSize, inputBufferSize));
+    assertTrue(memManager.isSpillNeeded(2 * inputBufferSize, inputBufferSize));
   }
 
   @Test
@@ -333,21 +359,22 @@ public class TestExternalSortInternals extends DrillTest {
     // Have to back off the exact size a bit to allow for internal fragmentation
     // in the merge and output batches.
 
-    int rowWidth = (int) (memoryLimit / 3 * 0.75);
+    int rowWidth = (int) (memoryLimit / 3 / 2);
     int rowCount = 1;
     int batchSize = rowWidth;
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
     assertEquals(rowWidth, memManager.getRowWidth());
-    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
     assertFalse(memManager.mayOverflow());
+    assertTrue(memManager.hasPerformanceWarning());
 
-    int spillBatchSize = memManager.getSpillBatchSize();
+    int spillBatchSize = memManager.getSpillBatchSize().dataSize;
     assertTrue(spillBatchSize >= rowWidth);
     assertTrue(spillBatchSize <= memoryLimit / 3);
     assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
     assertEquals(1, memManager.getSpillBatchRowCount());
 
-    int mergeBatchSize = memManager.getMergeBatchSize();
+    int mergeBatchSize = memManager.getMergeBatchSize().dataSize;
     assertTrue(mergeBatchSize >= rowWidth);
     assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
     assertEquals(1, memManager.getMergeBatchRowCount());
@@ -357,12 +384,26 @@ public class TestExternalSortInternals extends DrillTest {
     assertFalse(memManager.isSpillNeeded(0, batchSize));
     assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
     assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+  }
 
-    // In trouble now, can't fit even three rows.
+  @Test
+  public void testMemoryOverflow() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 10 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // In trouble now, can't fit even two input batches.
+    // A better implementation would spill the first batch to a file,
+    // leave it open, and append the second batch. Slicing each big input
+    // batch into small spill batches will allow the sort to proceed as
+    // long as it can hold a single input batch and single merge batch. But,
+    // the current implementation requires all batches to be spilled are in
+    // memory at the same time...
 
-    rowWidth = (int) (memoryLimit / 2);
-    rowCount = 1;
-    batchSize = rowWidth;
+    int rowWidth = (int) (memoryLimit / 2);
+    int rowCount = 1;
+    int batchSize = rowWidth;
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
     assertTrue(memManager.mayOverflow());
   }
@@ -406,7 +447,7 @@ public class TestExternalSortInternals extends DrillTest {
 
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
 
-    int spillBatchSize = memManager.getSpillBatchSize();
+    int spillBatchSize = memManager.getSpillBatchSize().dataSize;
 
     // Test various memory fill levels
 
@@ -432,63 +473,67 @@ public class TestExternalSortInternals extends DrillTest {
         .build();
     SortConfig sortConfig = new SortConfig(drillConfig);
     // Allow four spill batches, 8 MB each, plus one output of 16
-    long memoryLimit = 50 * ONE_MEG;
+    // Allow for internal fragmentation
+    // 96 > (4 * 8 + 16) * 2
+    long memoryLimit = 96 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
-    // Prime the estimates
+    // Prime the estimates. Batch size is data size, not buffer size.
 
     int rowWidth = 300;
     int rowCount = 10000;
     int batchSize = rowWidth * rowCount * 2;
 
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
-    int spillBatchSize = memManager.getSpillBatchSize();
-    int mergeBatchSize = memManager.getMergeBatchSize();
+    assertFalse(memManager.isLowMemory());
+    int spillBatchBufferSize = memManager.getSpillBatchSize().expectedBufferSize;
+    int inputBatchBufferSize = memManager.getInputBatchSize().expectedBufferSize;
 
     // One in-mem batch, no merging.
 
-    long allocMemory = memoryLimit - mergeBatchSize;
+    long allocMemory = inputBatchBufferSize;
     MergeTask task = memManager.consolidateBatches(allocMemory, 1, 0);
     assertEquals(MergeAction.NONE, task.action);
 
     // Many in-mem batches, just enough to merge
 
-    allocMemory = memoryLimit - mergeBatchSize;
-    int memBatches = (int) (allocMemory / batchSize);
-    allocMemory = memBatches * batchSize;
+    int memBatches = (int) (memManager.getMergeMemoryLimit() / inputBatchBufferSize);
+    allocMemory = memBatches * inputBatchBufferSize;
     task = memManager.consolidateBatches(allocMemory, memBatches, 0);
     assertEquals(MergeAction.NONE, task.action);
 
     // Spills if no room for spill and in-memory batches
 
-    task = memManager.consolidateBatches(allocMemory, memBatches, 1);
+    int spillCount = (int) Math.ceil((memManager.getMergeMemoryLimit() - allocMemory) / (1.0 * spillBatchBufferSize));
+    assertTrue(spillCount >= 1);
+    task = memManager.consolidateBatches(allocMemory, memBatches, spillCount);
     assertEquals(MergeAction.SPILL, task.action);
 
     // One more in-mem batch: now needs to spill
 
     memBatches++;
-    allocMemory = memBatches * batchSize;
+    allocMemory = memBatches * inputBatchBufferSize;
     task = memManager.consolidateBatches(allocMemory, memBatches, 0);
     assertEquals(MergeAction.SPILL, task.action);
 
     // No spill for various in-mem/spill run combinations
 
-    allocMemory = memoryLimit - spillBatchSize - mergeBatchSize;
-    memBatches = (int) (allocMemory / batchSize);
-    allocMemory = memBatches * batchSize;
+    long freeMem = memManager.getMergeMemoryLimit() - spillBatchBufferSize;
+    memBatches = (int) (freeMem / inputBatchBufferSize);
+    allocMemory = memBatches * inputBatchBufferSize;
     task = memManager.consolidateBatches(allocMemory, memBatches, 1);
     assertEquals(MergeAction.NONE, task.action);
 
-    allocMemory = memoryLimit - 2 * spillBatchSize - mergeBatchSize;
-    memBatches = (int) (allocMemory / batchSize);
-    allocMemory = memBatches * batchSize;
+    freeMem = memManager.getMergeMemoryLimit() - 2 * spillBatchBufferSize;
+    memBatches = (int) (freeMem / inputBatchBufferSize);
+    allocMemory = memBatches * inputBatchBufferSize;
     task = memManager.consolidateBatches(allocMemory, memBatches, 2);
     assertEquals(MergeAction.NONE, task.action);
 
     // No spill if no in-memory, only spill, and spill fits
 
-    long freeMem = memoryLimit - mergeBatchSize;
-    int spillBatches = (int) (freeMem / spillBatchSize);
+    freeMem = memManager.getMergeMemoryLimit();
+    int spillBatches = (int) (freeMem / spillBatchBufferSize);
     task = memManager.consolidateBatches(0, 0, spillBatches);
     assertEquals(MergeAction.NONE, task.action);
 
@@ -503,6 +548,47 @@ public class TestExternalSortInternals extends DrillTest {
     task = memManager.consolidateBatches(0, 0, spillBatches + 2);
     assertEquals(MergeAction.MERGE, task.action);
     assertEquals(3, task.count);
+
+    // If only one spilled run, and no in-memory batches,
+    // skip merge.
+
+    task = memManager.consolidateBatches(0, 0, 1);
+    assertEquals(MergeAction.NONE, task.action);
+
+    // Very large number of spilled runs. Limit to what fits in memory.
+
+    task = memManager.consolidateBatches(0, 0, 1000);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertTrue(task.count <= (int)(memoryLimit / spillBatchBufferSize) - 1);
+  }
+
+  @Test
+  public void testMergeCalcsExtreme() {
+
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+
+    // Force odd situation in which the spill batch is larger
+    // than memory. Won't actually run, but needed to test
+    // odd merge case.
+
+    long memoryLimit = ONE_MEG / 2;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Prime the estimates. Batch size is data size, not buffer size.
+
+    int rowWidth = (int) memoryLimit;
+    int rowCount = 1;
+    int batchSize = rowWidth;
+
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertTrue(memManager.getMergeMemoryLimit() < rowWidth);
+
+    // Only one spill batch, that batch is above the merge memory limit,
+    // but nothing useful comes from merging.
+
+    MergeTask task = memManager.consolidateBatches(0, 0, 1);
+    assertEquals(MergeAction.NONE, task.action);
   }
 
   @Test