You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:11 UTC

[3/5] drill git commit: DRILL-5325: Unit tests for the managed sort

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
index 76b178c..7a460f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.xsort.managed;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.util.List;
 
@@ -26,11 +24,11 @@ import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.AllocationHelper;
+
+import io.netty.buffer.DrillBuf;
 
 public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
@@ -43,7 +41,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
   private int queueSize = 0;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+  public void setup(BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
                     VectorAccessible outgoing) throws SchemaChangeException {
     this.hyperBatch = hyperBatch;
     this.batchGroups = batchGroups;
@@ -53,7 +51,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     @SuppressWarnings("resource")
     final DrillBuf drillBuf = allocator.buffer(4 * size);
     vector4 = new SelectionVector4(drillBuf, size, Character.MAX_VALUE);
-    doSetup(context, hyperBatch, outgoing);
+    doSetup(hyperBatch, outgoing);
 
     queueSize = 0;
     for (int i = 0; i < size; i++) {
@@ -68,7 +66,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
 
   @Override
   public int next(int targetRecordCount) {
-    allocateVectors(targetRecordCount);
+    VectorAccessibleUtilities.allocateVectors(outgoing, targetRecordCount);
     for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
       if (queueSize == 0) {
         return 0;
@@ -76,7 +74,11 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
       int compoundIndex = vector4.get(0);
       int batch = compoundIndex >>> 16;
       assert batch < batchGroups.size() : String.format("batch: %d batchGroups: %d", batch, batchGroups.size());
-      doCopy(compoundIndex, outgoingIndex);
+      try {
+        doCopy(compoundIndex, outgoingIndex);
+      } catch (SchemaChangeException e) {
+        throw new IllegalStateException(e);
+      }
       int nextIndex = batchGroups.get(batch).getNextIndex();
       if (nextIndex < 0) {
         vector4.set(0, vector4.get(--queueSize));
@@ -84,37 +86,28 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
         vector4.set(0, batch, nextIndex);
       }
       if (queueSize == 0) {
-        setValueCount(++outgoingIndex);
+        VectorAccessibleUtilities.setValueCount(outgoing, ++outgoingIndex);
         return outgoingIndex;
       }
-      siftDown();
+      try {
+        siftDown();
+      } catch (SchemaChangeException e) {
+        throw new IllegalStateException(e);
+      }
     }
-    setValueCount(targetRecordCount);
+    VectorAccessibleUtilities.setValueCount(outgoing, targetRecordCount);
     return targetRecordCount;
   }
 
-  private void setValueCount(int count) {
-    for (VectorWrapper<?> w: outgoing) {
-      w.getValueVector().getMutator().setValueCount(count);
-    }
-  }
-
   @Override
   public void close() throws IOException {
     vector4.clear();
-    for (final VectorWrapper<?> w: outgoing) {
-      w.getValueVector().clear();
-    }
-    for (final VectorWrapper<?> w : hyperBatch) {
-      w.clear();
-    }
-
-    for (BatchGroup batchGroup : batchGroups) {
-      batchGroup.close();
-    }
+    VectorAccessibleUtilities.clear(outgoing);
+    VectorAccessibleUtilities.clear(hyperBatch);
+    BatchGroup.closeAll(batchGroups);
   }
 
-  private void siftUp() {
+  private void siftUp() throws SchemaChangeException {
     int p = queueSize;
     while (p > 0) {
       if (compare(p, (p - 1) / 2) < 0) {
@@ -126,13 +119,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     }
   }
 
-  private void allocateVectors(int targetRecordCount) {
-    for (VectorWrapper<?> w: outgoing) {
-      AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
-    }
-  }
-
-  private void siftDown() {
+  private void siftDown() throws SchemaChangeException {
     int p = 0;
     int next;
     while (p * 2 + 1 < queueSize) { // While the current node has at least one child
@@ -160,14 +147,19 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
     vector4.set(sv1, tmp);
   }
 
-  public int compare(int leftIndex, int rightIndex) {
+  public int compare(int leftIndex, int rightIndex) throws SchemaChangeException {
     int sv1 = vector4.get(leftIndex);
     int sv2 = vector4.get(rightIndex);
     return doEval(sv1, sv2);
   }
 
-  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
-  public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-  public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
+  public abstract void doSetup(@Named("incoming") VectorAccessible incoming,
+                               @Named("outgoing") VectorAccessible outgoing)
+                       throws SchemaChangeException;
+  public abstract int doEval(@Named("leftIndex") int leftIndex,
+                             @Named("rightIndex") int rightIndex)
+                      throws SchemaChangeException;
+  public abstract void doCopy(@Named("inIndex") int inIndex,
+                              @Named("outIndex") int outIndex)
+                       throws SchemaChangeException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
new file mode 100644
index 0000000..6b71782
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.CopyUtil;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * Manages a {@link PriorityQueueCopier} instance produced from code generation.
+ * Provides a wrapper around a copier "session" to simplify reading batches
+ * from the copier.
+ */
+
+public class PriorityQueueCopierWrapper extends BaseSortWrapper {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierWrapper.class);
+
+  private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+  private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
+
+  /**
+   * A single PriorityQueueCopier instance is used for 2 purposes:
+   * 1. Merge sorted batches before spilling
+   * 2. Merge sorted batches when all incoming data fits in memory
+   */
+
+  private PriorityQueueCopier copier;
+
+  public PriorityQueueCopierWrapper(OperExecContext opContext) {
+    super(opContext);
+  }
+
+  public PriorityQueueCopier getCopier(VectorAccessible batch) {
+    if (copier == null) {
+      copier = newCopier(batch);
+    }
+    return copier;
+  }
+
+  private PriorityQueueCopier newCopier(VectorAccessible batch) {
+    // Generate the copier code and obtain the resulting class
+
+    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+    ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
+    cg.plainJavaCapable(true);
+    // Uncomment out this line to debug the generated code.
+//    cg.saveCodeForDebugging(true);
+
+    generateComparisons(g, batch, logger);
+
+    g.setMappingSet(COPIER_MAPPING_SET);
+    CopyUtil.generateCopies(g, batch, true);
+    g.setMappingSet(MAIN_MAPPING);
+    return getInstance(cg, logger);
+  }
+
+  /**
+   * Start a merge operation using the specified vector container. Used for
+   * the final merge operation.
+   *
+   * @param schema
+   * @param batchGroupList
+   * @param outputContainer
+   * @param targetRecordCount
+   * @return
+   */
+  public BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int targetRecordCount) {
+    return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount);
+  }
+
+  /**
+   * Prepare a copier which will write a collection of vectors to disk. The copier
+   * uses generated code to do the actual writes. If the copier has not yet been
+   * created, generate code and create it. If it has been created, close it and
+   * prepare it for a new collection of batches.
+   *
+   * @param batch the (hyper) batch of vectors to be copied
+   * @param batchGroupList same batches as above, but represented as a list
+   * of individual batches
+   * @param outputContainer the container into which to copy the batches
+   */
+
+  @SuppressWarnings("unchecked")
+  private void createCopier(VectorAccessible batch, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer) {
+    copier = getCopier(batch);
+
+    // Initialize the value vectors for the output container
+
+    for (VectorWrapper<?> i : batch) {
+      @SuppressWarnings("resource")
+      ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+      outputContainer.add(v);
+    }
+    try {
+      copier.setup(context.getAllocator(), batch, (List<BatchGroup>) batchGroupList, outputContainer);
+    } catch (SchemaChangeException e) {
+      throw UserException.unsupportedError(e)
+            .message("Unexpected schema change - likely code error.")
+            .build(logger);
+    }
+  }
+
+  public BufferAllocator getAllocator() { return context.getAllocator(); }
+
+  public void close() {
+    if (copier == null) {
+      return; }
+    try {
+      copier.close();
+      copier = null;
+    } catch (IOException e) {
+      throw UserException.dataWriteError(e)
+            .message("Failure while flushing spilled data")
+            .build(logger);
+    }
+  }
+
+  /**
+   * We've gathered a set of batches, each of which has been sorted. The batches
+   * may have passed through a filter and thus may have "holes" where rows have
+   * been filtered out. We will spill records in blocks of targetRecordCount.
+   * To prepare, copy that many records into an outputContainer as a set of
+   * contiguous values in new vectors. The result is a single batch with
+   * vectors that combine a collection of input batches up to the
+   * given threshold.
+   * <p>
+   * Input. Here the top line is a selection vector of indexes.
+   * The second line is a set of batch groups (separated by underscores)
+   * with letters indicating individual records:<pre>
+   * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
+   * [eh_ad_ibf]     [r_qm_kn_p]</pre>
+   * <p>
+   * Output, assuming blocks of 5 records. The brackets represent
+   * batches, the line represents the set of batches copied to the
+   * spill file.<pre>
+   * [abcde] [fhikm] [npqr]</pre>
+   * <p>
+   * The copying operation does a merge as well: copying
+   * values from the sources in ordered fashion. Consider a different example,
+   * we want to merge two input batches to produce a single output batch:
+   * <pre>
+   * Input:  [aceg] [bdfh]
+   * Output: [abcdefgh]</pre>
+   * <p>
+   * In the above, the input consists of two sorted batches. (In reality,
+   * the input batches have an associated selection vector, but that is omitted
+   * here and just the sorted values shown.) The output is a single batch
+   * with the merged records (indicated by letters) from the two input batches.
+   * <p>
+   * Here we bind the copier to the batchGroupList of sorted, buffered batches
+   * to be merged. We bind the copier output to outputContainer: the copier will write its
+   * merged "batches" of records to that container.
+   * <p>
+   * Calls to the {@link #next()} method sequentially return merged batches
+   * of the desired row count.
+    */
+
+  public static class BatchMerger implements SortResults, AutoCloseable {
+
+    private PriorityQueueCopierWrapper holder;
+    private VectorContainer hyperBatch;
+    private VectorContainer outputContainer;
+    private int targetRecordCount;
+    private int copyCount;
+    private int batchCount;
+    private long estBatchSize;
+
+    /**
+     * Creates a merger with an temporary output container.
+     *
+     * @param holder the copier that does the work
+     * @param schema schema for the input and output batches
+     * @param batchGroupList the input batches
+     * @param targetRecordCount number of records for each output batch
+     */
+    private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+                        int targetRecordCount) {
+      this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount);
+    }
+
+    /**
+     * Creates a merger with the specified output container
+     *
+     * @param holder the copier that does the work
+     * @param schema schema for the input and output batches
+     * @param batchGroupList the input batches
+     * @param outputContainer merges output batch into the given output container
+     * @param targetRecordCount number of records for each output batch
+     */
+    private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+                        VectorContainer outputContainer, int targetRecordCount) {
+      this.holder = holder;
+      hyperBatch = constructHyperBatch(schema, batchGroupList);
+      copyCount = 0;
+      this.targetRecordCount = targetRecordCount;
+      this.outputContainer = outputContainer;
+      holder.createCopier(hyperBatch, batchGroupList, outputContainer);
+    }
+
+    /**
+     * Read the next merged batch. The batch holds the specified row count, but
+     * may be less if this is the last batch.
+     *
+     * @return the number of rows in the batch, or 0 if no more batches
+     * are available
+     */
+
+    @Override
+    public boolean next() {
+      Stopwatch w = Stopwatch.createStarted();
+      long start = holder.getAllocator().getAllocatedMemory();
+      int count = holder.copier.next(targetRecordCount);
+      copyCount += count;
+      if (count > 0) {
+        long t = w.elapsed(TimeUnit.MICROSECONDS);
+        batchCount++;
+        logger.trace("Took {} us to merge {} records", t, count);
+        long size = holder.getAllocator().getAllocatedMemory() - start;
+        estBatchSize = Math.max(estBatchSize, size);
+      } else {
+        logger.trace("copier returned 0 records");
+      }
+
+      // Identify the schema to be used in the output container. (Since
+      // all merged batches have the same schema, the schema we identify
+      // here should be the same as that which we already had.
+
+      outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+      // The copier does not set the record count in the output
+      // container, so do that here.
+
+      outputContainer.setRecordCount(count);
+
+      return count > 0;
+    }
+
+    /**
+     * Construct a vector container that holds a list of batches, each represented as an
+     * array of vectors. The entire collection of vectors has a common schema.
+     * <p>
+     * To build the collection, we go through the current schema (which has been
+     * devised to be common for all batches.) For each field in the schema, we create
+     * an array of vectors. To create the elements, we iterate over all the incoming
+     * batches and search for the vector that matches the current column.
+     * <p>
+     * Finally, we build a new schema for the combined container. That new schema must,
+     * because of the way the container was created, match the current schema.
+     *
+     * @param schema schema for the hyper batch
+     * @param batchGroupList list of batches to combine
+     * @return a container where each column is represented as an array of vectors
+     * (hence the "hyper" in the method name)
+     */
+
+    private VectorContainer constructHyperBatch(BatchSchema schema, List<? extends BatchGroup> batchGroupList) {
+      VectorContainer cont = new VectorContainer();
+      for (MaterializedField field : schema) {
+        ValueVector[] vectors = new ValueVector[batchGroupList.size()];
+        int i = 0;
+        for (BatchGroup group : batchGroupList) {
+          vectors[i++] = group.getValueAccessorById(
+              field.getValueClass(),
+              group.getValueVectorId(SchemaPath.getSimplePath(field.getPath())).getFieldIds())
+              .getValueVector();
+        }
+        cont.add(vectors);
+      }
+      cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+      return cont;
+    }
+
+    @Override
+    public void close() {
+      hyperBatch.clear();
+      holder.close();
+    }
+
+    @Override
+    public int getRecordCount() { return outputContainer.getRecordCount(); }
+
+    @Override
+    public int getBatchCount() { return batchCount; }
+
+    /**
+     * Gets the estimated batch size, in bytes. Use for estimating the memory
+     * needed to process the batches that this operator created.
+     * @return the size of the largest batch created by this operation,
+     * in bytes
+     */
+
+    public long getEstBatchSize() { return estBatchSize; }
+
+    @Override
+    public SelectionVector4 getSv4() { return null; }
+
+    @Override
+    public SelectionVector2 getSv2() { return null; }
+
+    @Override
+    public VectorContainer getContainer() { return outputContainer; }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
new file mode 100644
index 0000000..e47d67e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+
+public class SortConfig {
+
+  /**
+   * Smallest allowed output batch size. The smallest output batch
+   * created even under constrained memory conditions.
+   */
+  public static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
+
+  /**
+   * In the bizarre case where the user gave us an unrealistically low
+   * spill file size, set a floor at some bare minimum size. (Note that,
+   * at this size, big queries will create a huge number of files, which
+   * is why the configuration default is one the order of hundreds of MB.)
+   */
+
+  public static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024;
+
+  public static final int DEFAULT_SPILL_BATCH_SIZE = 8 * 1024 * 1024;
+  public static final int MIN_SPILL_BATCH_SIZE = 256 * 1024;
+  public static final int MIN_MERGE_BATCH_SIZE = 256 * 1024;
+
+  public static final int MIN_MERGE_LIMIT = 2;
+
+  private final long maxMemory;
+
+  /**
+   * Maximum number of spilled runs that can be merged in a single pass.
+   */
+
+  private final int mergeLimit;
+
+  /**
+   * Target size of the first-generation spill files.
+   */
+  private final long spillFileSize;
+
+  private final int spillBatchSize;
+
+  private final int mergeBatchSize;
+
+  private final int bufferedBatchLimit;
+
+
+  public SortConfig(DrillConfig config) {
+
+    // Optional configured memory limit, typically used only for testing.
+
+    maxMemory = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
+
+    // Optional limit on the number of spilled runs to merge in a single
+    // pass. Limits the number of open file handles. Must allow at least
+    // two batches to merge to make progress.
+
+    int limit = config.getInt(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT);
+    if (limit > 0) {
+      mergeLimit = Math.max(limit, MIN_MERGE_LIMIT);
+    } else {
+      mergeLimit = Integer.MAX_VALUE;
+    }
+
+    // Limits the size of first-generation spill files.
+    // Ensure the size is reasonable.
+
+    spillFileSize = Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE), MIN_SPILL_FILE_SIZE);
+    spillBatchSize = (int) Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE), MIN_SPILL_BATCH_SIZE);
+
+    // Set the target output batch size. Use the maximum size, but only if
+    // this represents less than 10% of available memory. Otherwise, use 10%
+    // of memory, but no smaller than the minimum size. In any event, an
+    // output batch can contain no fewer than a single record.
+
+    mergeBatchSize = (int) Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE), MIN_MERGE_BATCH_SIZE);
+
+    // Limit on in-memory batches, primarily for testing.
+
+    int value = config.getInt(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT);
+    if (value == 0) {
+      bufferedBatchLimit = Integer.MAX_VALUE;
+    } else {
+      bufferedBatchLimit = Math.max(value, 2);
+    }
+    logConfig();
+  }
+
+  private void logConfig() {
+    ExternalSortBatch.logger.debug("Config: " +
+                 "spill file size = {}, spill batch size = {}, " +
+                 "merge limit = {}, merge batch size = {}",
+                  spillFileSize(), spillFileSize(),
+                  mergeLimit(), mergeBatchSize());
+  }
+
+  public long maxMemory() { return maxMemory; }
+  public int mergeLimit() { return mergeLimit; }
+  public long spillFileSize() { return spillFileSize; }
+  public int spillBatchSize() { return spillBatchSize; }
+  public int mergeBatchSize() { return mergeBatchSize; }
+  public int getBufferedBatchLimit() { return bufferedBatchLimit; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
new file mode 100644
index 0000000..6f0da3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Implementation of the external sort which is wrapped into the Drill
+ * "next" protocol by the {@link ExternalSortBatch} class.
+ * <p>
+ * Accepts incoming batches. Sorts each and will spill to disk as needed.
+ * When all input is delivered, can either do an in-memory merge or a
+ * merge from disk. If runs spilled, may have to do one or more "consolidation"
+ * passes to reduce the number of runs to the level that will fit in memory.
+ */
+
+public class SortImpl {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+
+  /**
+   * Iterates over the final sorted results. Implemented differently
+   * depending on whether the results are in-memory or spilled to
+   * disk.
+   */
+
+  public interface SortResults {
+    /**
+     * Container into which results are delivered. May the
+     * the original operator container, or may be a different
+     * one. This is the container that should be sent
+     * downstream. This is a fixed value for all returned
+     * results.
+     * @return
+     */
+    VectorContainer getContainer();
+    boolean next();
+    void close();
+    int getBatchCount();
+    int getRecordCount();
+    SelectionVector2 getSv2();
+    SelectionVector4 getSv4();
+  }
+
+  private final SortConfig config;
+  private final SortMetrics metrics;
+  private final SortMemoryManager memManager;
+  private VectorContainer outputBatch;
+  private OperExecContext context;
+
+  /**
+   * Memory allocator for this operator itself. Incoming batches are
+   * transferred into this allocator. Intermediate batches used during
+   * merge also reside here.
+   */
+
+  private final BufferAllocator allocator;
+
+  private final SpilledRuns spilledRuns;
+
+  private final BufferedBatches bufferedBatches;
+
+  public SortImpl(OperExecContext opContext, SortConfig sortConfig, SpilledRuns spilledRuns, VectorContainer batch) {
+    this.context = opContext;
+    outputBatch = batch;
+    this.spilledRuns = spilledRuns;
+    allocator = opContext.getAllocator();
+    config = sortConfig;
+    memManager = new SortMemoryManager(config, allocator.getLimit());
+    metrics = new SortMetrics(opContext.getStats());
+    bufferedBatches = new BufferedBatches(opContext);
+
+    // Reset the allocator to allow a 10% safety margin. This is done because
+    // the memory manager will enforce the original limit. Changing the hard
+    // limit will reduce the probability that random chance causes the allocator
+    // to kill the query because of a small, spurious over-allocation.
+
+    allocator.setLimit((long)(allocator.getLimit() * 1.10));
+  }
+
+  public void setSchema(BatchSchema schema) {
+    bufferedBatches.setSchema(schema);
+    spilledRuns.setSchema(schema);
+  }
+
+  public boolean forceSpill() {
+    if (bufferedBatches.size() < 2) {
+      return false;
+    }
+    spillFromMemory();
+    return true;
+  }
+
+  /**
+   * Process the converted incoming batch by adding it to the in-memory store
+   * of data, or spilling data to disk when necessary.
+   * @param incoming
+   */
+
+  public void addBatch(VectorAccessible incoming) {
+
+    // Skip empty batches (such as the first one.)
+
+    if (incoming.getRecordCount() == 0) {
+      VectorAccessibleUtilities.clear(incoming);
+      return;
+    }
+
+    // Determine actual sizes of the incoming batch before taking
+    // ownership. Allows us to figure out if we need to spill first,
+    // to avoid overflowing memory simply due to ownership transfer.
+
+    RecordBatchSizer sizer = analyzeIncomingBatch(incoming);
+
+    // The heart of the external sort operator: spill to disk when
+    // the in-memory generation exceeds the allowed memory limit.
+    // Preemptively spill BEFORE accepting the new batch into our memory
+    // pool. The allocator will throw an OOM exception if we accept the
+    // batch when we are near the limit - despite the fact that the batch
+    // is already in memory and no new memory is allocated during the transfer.
+
+    if ( isSpillNeeded(sizer.actualSize())) {
+      spillFromMemory();
+    }
+
+    // Sanity check. We should now be below the buffer memory maximum.
+
+    long startMem = allocator.getAllocatedMemory();
+    bufferedBatches.add(incoming, sizer.netSize());
+
+    // Compute batch size, including allocation of an sv2.
+
+    long endMem = allocator.getAllocatedMemory();
+    long batchSize = endMem - startMem;
+
+    // Update the minimum buffer space metric.
+
+    metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize());
+    metrics.updateMemory(memManager.freeMemory(endMem));
+    metrics.updatePeakBatches(bufferedBatches.size());
+
+    // Update the size based on the actual record count, not
+    // the effective count as given by the selection vector
+    // (which may exclude some records due to filtering.)
+
+    validateBatchSize(sizer.actualSize(), batchSize);
+    memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount());
+  }
+
+  /**
+   * Scan the vectors in the incoming batch to determine batch size.
+   *
+   * @return an analysis of the incoming batch
+   */
+
+  private RecordBatchSizer analyzeIncomingBatch(VectorAccessible incoming) {
+    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+    sizer.applySv2();
+    if (metrics.getInputBatchCount() == 0) {
+      logger.debug("{}", sizer.toString());
+    }
+    return sizer;
+  }
+
+  /**
+   * Determine if spill is needed before receiving the new record batch.
+   * Spilling is driven purely by memory availability (and an optional
+   * batch limit for testing.)
+   *
+   * @return true if spilling is needed, false otherwise
+   */
+
+  private boolean isSpillNeeded(int incomingSize) {
+
+    // Can't spill if less than two batches else the merge
+    // can't make progress.
+
+    if (bufferedBatches.size() < 2) {
+      return false; }
+
+    if (bufferedBatches.size() >= config.getBufferedBatchLimit()) {
+      return true; }
+    return memManager.isSpillNeeded(allocator.getAllocatedMemory(), incomingSize);
+  }
+
+  private void validateBatchSize(long actualBatchSize, long memoryDelta) {
+    if (actualBatchSize != memoryDelta) {
+      ExternalSortBatch.logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}",
+                   memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
+    }
+  }
+
+  /**
+   * This operator has accumulated a set of sorted incoming record batches.
+   * We wish to spill some of them to disk. To do this, a "copier"
+   * merges the target batches to produce a stream of new (merged) batches
+   * which are then written to disk.
+   * <p>
+   * This method spills only half the accumulated batches
+   * minimizing unnecessary disk writes. The exact count must lie between
+   * the minimum and maximum spill counts.
+   */
+
+  private void spillFromMemory() {
+    int startCount = bufferedBatches.size();
+    List<BatchGroup> batchesToSpill = bufferedBatches.prepareSpill(config.spillFileSize());
+
+    // Do the actual spill.
+
+    logger.trace("Spilling {} of {} batches, memory = {}",
+        batchesToSpill.size(), startCount,
+        allocator.getAllocatedMemory());
+    int spillBatchRowCount = memManager.getSpillBatchRowCount();
+    spilledRuns.mergeAndSpill(batchesToSpill, spillBatchRowCount);
+    metrics.incrSpillCount();
+  }
+
+  public SortMetrics getMetrics() { return metrics; }
+
+  public static class EmptyResults implements SortResults {
+
+    private final VectorContainer dest;
+
+    public EmptyResults(VectorContainer dest) {
+      this.dest = dest;
+    }
+
+    @Override
+    public boolean next() { return false; }
+
+    @Override
+    public void close() { }
+
+    @Override
+    public int getBatchCount() { return 0; }
+
+    @Override
+    public int getRecordCount() { return 0; }
+
+    @Override
+    public SelectionVector4 getSv4() { return null; }
+
+    @Override
+    public SelectionVector2 getSv2() { return null; }
+
+    @Override
+    public VectorContainer getContainer() { return dest; }
+  }
+
+  public SortResults startMerge() {
+    if (metrics.getInputRowCount() == 0) {
+      return new EmptyResults(outputBatch);
+    }
+
+    logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}",
+        metrics.getInputBatchCount(), spilledRuns.size(),
+        metrics.getInputBytes());
+
+    // Do the merge of the loaded batches. The merge can be done entirely in memory if
+    // the results fit; else we have to do a disk-based merge of
+    // pre-sorted spilled batches.
+
+    boolean optimizeOn = true; // Debug only
+    if (optimizeOn && metrics.getInputBatchCount() == 1) {
+      return singleBatchResult();
+    } else if (canUseMemoryMerge()) {
+      return mergeInMemory();
+    } else {
+      return mergeSpilledRuns();
+    }
+  }
+
+  /**
+   * Return results for a single input batch. No merge is needed;
+   * the original (sorted) input batch is simply passed as the result.
+   * Note that this version requires replacing the operator output
+   * container with the batch container. (Vector ownership transfer
+   * was already done when accepting the input batch.)
+   */
+
+  public static class SingleBatchResults implements SortResults {
+
+    private boolean done;
+    private final BatchGroup.InputBatch batch;
+
+    public SingleBatchResults(BatchGroup.InputBatch batch) {
+      this.batch = batch;
+    }
+
+    @Override
+    public boolean next() {
+      if (done) {
+        return false;
+      }
+      done = true;
+      return true;
+    }
+
+    @Override
+    public void close() {
+      try {
+        batch.close();
+      } catch (IOException e) {
+        // Should never occur for an input batch
+        throw new IllegalStateException(e);
+      }
+    }
+
+    @Override
+    public int getBatchCount() { return 1; }
+
+    @Override
+    public int getRecordCount() { return batch.getRecordCount(); }
+
+    @Override
+    public SelectionVector4 getSv4() { return null; }
+
+    @Override
+    public SelectionVector2 getSv2() { return batch.getSv2(); }
+
+    @Override
+    public VectorContainer getContainer() {return batch.getContainer(); }
+  }
+
+  /**
+   * Input consists of a single batch. Just return that batch as
+   * the output.
+   * @return results iterator over the single input batch
+   */
+
+  private SortResults singleBatchResult() {
+    List<InputBatch> batches = bufferedBatches.removeAll();
+    return new SingleBatchResults(batches.get(0));
+  }
+
+  /**
+   * All data has been read from the upstream batch. Determine if we
+   * can use a fast in-memory sort, or must use a merge (which typically,
+   * but not always, involves spilled batches.)
+   *
+   * @return whether sufficient resources exist to do an in-memory sort
+   * if all batches are still in memory
+   */
+
+  private boolean canUseMemoryMerge() {
+    if (spilledRuns.hasSpilled()) {
+      return false; }
+
+    // Do we have enough memory for MSorter (the in-memory sorter)?
+
+    if (! memManager.hasMemoryMergeCapacity(allocator.getAllocatedMemory(), MSortTemplate.memoryNeeded(metrics.getInputRowCount()))) {
+      return false; }
+
+    // Make sure we don't exceed the maximum number of batches SV4 can address.
+
+    if (bufferedBatches.size() > Character.MAX_VALUE) {
+      return false; }
+
+    // We can do an in-memory merge.
+
+    return true;
+  }
+
+  /**
+   * Perform an in-memory sort of the buffered batches. Obviously can
+   * be used only for the non-spilling case.
+   *
+   * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
+   */
+
+  private SortResults mergeInMemory() {
+    logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
+                 bufferedBatches.size(), metrics.getInputRowCount(),
+                 allocator.getAllocatedMemory());
+
+    // Note the difference between how we handle batches here and in the spill/merge
+    // case. In the spill/merge case, this class decides on the batch size to send
+    // downstream. However, in the in-memory case, we must pass along all batches
+    // in a single SV4. Attempts to do paging will result in errors. In the memory
+    // merge case, the downstream Selection Vector Remover will split the one
+    // big SV4 into multiple smaller batches to send further downstream.
+
+    // If the sort fails or is empty, clean up here. Otherwise, cleanup is done
+    // by closing the resultsIterator after all results are returned downstream.
+
+    MergeSortWrapper memoryMerge = new MergeSortWrapper(context, outputBatch);
+    try {
+      memoryMerge.merge(bufferedBatches.removeAll());
+    } catch (Throwable t) {
+      memoryMerge.close();
+      throw t;
+    }
+    logger.debug("Completed in-memory sort. Memory = {}",
+                 allocator.getAllocatedMemory());
+    return memoryMerge;
+  }
+
+  /**
+   * Perform merging of (typically spilled) batches. First consolidates batches
+   * as needed, then performs a final merge that is read one batch at a time
+   * to deliver batches to the downstream operator.
+   *
+   * @return an iterator over the merged batches
+   */
+
+  private SortResults mergeSpilledRuns() {
+    logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
+                 metrics.getInputBatchCount(), metrics.getInputRowCount(),
+                 allocator.getAllocatedMemory(),
+                 bufferedBatches.size(), spilledRuns.size());
+
+    // Consolidate batches to a number that can be merged in
+    // a single last pass.
+
+    loop:
+    for (;;) {
+      MergeTask task = memManager.consolidateBatches(
+          allocator.getAllocatedMemory(),
+          bufferedBatches.size(),
+          spilledRuns.size());
+      switch (task.action) {
+      case SPILL:
+        spillFromMemory();
+        break;
+      case MERGE:
+        mergeRuns(task.count);
+        break;
+      case NONE:
+        break loop;
+      default:
+        throw new IllegalStateException("Unexpected action: " + task.action);
+      }
+    }
+
+    int mergeRowCount = memManager.getMergeBatchRowCount();
+    return spilledRuns.finalMerge(bufferedBatches.removeAll(), outputBatch, mergeRowCount);
+  }
+
+  private void mergeRuns(int targetCount) {
+    long mergeMemoryPool = memManager.getMergeMemoryLimit();
+    int spillBatchRowCount = memManager.getSpillBatchRowCount();
+    spilledRuns.mergeRuns(targetCount, mergeMemoryPool, spillBatchRowCount);
+    metrics.incrMergeCount();
+  }
+
+  public void close() {
+    metrics.updateWriteBytes(spilledRuns.getWriteBytes());
+    RuntimeException ex = null;
+    try {
+      spilledRuns.close();
+    } catch (RuntimeException e) {
+      ex = e;
+    }
+    try {
+      bufferedBatches.close();
+    } catch (RuntimeException e) {
+      ex = ex == null ? e : ex;
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
new file mode 100644
index 0000000..213720f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SortMemoryManager {
+
+  /**
+   * Maximum memory this operator may use. Usually comes from the
+   * operator definition, but may be overridden by a configuration
+   * parameter for unit testing.
+   */
+
+  private final long memoryLimit;
+
+  /**
+   * Estimated size of the records for this query, updated on each
+   * new batch received from upstream.
+   */
+
+  private int estimatedRowWidth;
+
+  /**
+   * Size of the merge batches that this operator produces. Generally
+   * the same as the merge batch size, unless low memory forces a smaller
+   * value.
+   */
+
+  private int expectedMergeBatchSize;
+
+  /**
+   * Estimate of the input batch size based on the largest batch seen
+   * thus far.
+   */
+  private int estimatedInputBatchSize;
+
+  /**
+   * Maximum memory level before spilling occurs. That is, we can buffer input
+   * batches in memory until we reach the level given by the buffer memory pool.
+   */
+
+  private long bufferMemoryLimit;
+
+  /**
+   * Maximum memory that can hold batches during the merge
+   * phase.
+   */
+
+  private long mergeMemoryLimit;
+
+  /**
+   * The target size for merge batches sent downstream.
+   */
+
+  private int preferredMergeBatchSize;
+
+  /**
+   * The configured size for each spill batch.
+   */
+  private int preferredSpillBatchSize;
+
+  /**
+   * Estimated number of rows that fit into a single spill batch.
+   */
+
+  private int spillBatchRowCount;
+
+  /**
+   * The estimated actual spill batch size which depends on the
+   * details of the data rows for any particular query.
+   */
+
+  private int expectedSpillBatchSize;
+
+  /**
+   * The number of records to add to each output batch sent to the
+   * downstream operator or spilled to disk.
+   */
+
+  private int mergeBatchRowCount;
+
+  private SortConfig config;
+
+  private int estimatedInputSize;
+
+  private boolean potentialOverflow;
+
+  public SortMemoryManager(SortConfig config, long memoryLimit) {
+    this.config = config;
+
+    // The maximum memory this operator can use as set by the
+    // operator definition (propagated to the allocator.)
+
+    if (config.maxMemory() > 0) {
+      this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
+    } else {
+      this.memoryLimit = memoryLimit;
+    }
+
+    preferredSpillBatchSize = config.spillBatchSize();;
+    preferredMergeBatchSize = config.mergeBatchSize();
+  }
+
+  /**
+   * Update the data-driven memory use numbers including:
+   * <ul>
+   * <li>The average size of incoming records.</li>
+   * <li>The estimated spill and output batch size.</li>
+   * <li>The estimated number of average-size records per
+   * spill and output batch.</li>
+   * <li>The amount of memory set aside to hold the incoming
+   * batches before spilling starts.</li>
+   * </ul>
+   * <p>
+   * Under normal circumstances, the amount of memory available is much
+   * larger than the input, spill or merge batch sizes. The primary question
+   * is to determine how many input batches we can buffer during the load
+   * phase, and how many spill batches we can merge during the merge
+   * phase.
+   *
+   * @param batchSize the overall size of the current batch received from
+   * upstream
+   * @param batchRowWidth the average width in bytes (including overhead) of
+   * rows in the current input batch
+   * @param batchRowCount the number of actual (not filtered) records in
+   * that upstream batch
+   */
+
+  public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+
+    // The record count should never be zero, but better safe than sorry...
+
+    if (batchRowCount == 0) {
+      return; }
+
+
+    // Update input batch estimates.
+    // Go no further if nothing changed.
+
+    if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
+      return;
+    }
+
+    updateSpillSettings();
+    updateMergeSettings();
+    adjustForLowMemory();
+    logSettings(batchRowCount);
+  }
+
+  private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+
+    // The row width may end up as zero if all fields are nulls or some
+    // other unusual situation. In this case, assume a width of 10 just
+    // to avoid lots of special case code.
+
+    if (batchRowWidth == 0) {
+      batchRowWidth = 10;
+    }
+
+    // We know the batch size and number of records. Use that to estimate
+    // the average record size. Since a typical batch has many records,
+    // the average size is a fairly good estimator. Note that the batch
+    // size includes not just the actual vector data, but any unused space
+    // resulting from power-of-two allocation. This means that we don't
+    // have to do size adjustments for input batches as we will do below
+    // when estimating the size of other objects.
+
+    // Record sizes may vary across batches. To be conservative, use
+    // the largest size observed from incoming batches.
+
+    int origRowEstimate = estimatedRowWidth;
+    estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
+
+    // Maintain an estimate of the incoming batch size: the largest
+    // batch yet seen. Used to reserve memory for the next incoming
+    // batch. Because we are using the actual observed batch size,
+    // the size already includes overhead due to power-of-two rounding.
+
+    long origInputBatchSize = estimatedInputBatchSize;
+    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
+
+    // Estimate the total size of each incoming batch plus sv2. Note that, due
+    // to power-of-two rounding, the allocated sv2 size might be twice the data size.
+
+    estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
+
+    // Return whether anything changed.
+
+    return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize;
+  }
+
+  /**
+   * Determine the number of records to spill per spill batch. The goal is to
+   * spill batches of either 64K records, or as many records as fit into the
+   * amount of memory dedicated to each spill batch, whichever is less.
+   */
+
+  private void updateSpillSettings() {
+
+    spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
+
+    // Compute the actual spill batch size which may be larger or smaller
+    // than the preferred size depending on the row width. Double the estimated
+    // memory needs to allow for power-of-two rounding.
+
+    expectedSpillBatchSize = batchForRows(spillBatchRowCount);
+
+    // Determine the minimum memory needed for spilling. Spilling is done just
+    // before accepting a spill batch, so we must spill if we don't have room for a
+    // (worst case) input batch. To spill, we need room for the spill batch created
+    // by merging the batches already in memory.
+
+    bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+  }
+
+  /**
+   * Determine the number of records per batch per merge step. The goal is to
+   * merge batches of either 64K records, or as many records as fit into the
+   * amount of memory dedicated to each merge batch, whichever is less.
+   */
+
+  private void updateMergeSettings() {
+
+    mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
+    expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
+
+    // The merge memory pool assumes we can spill all input batches. The memory
+    // available to hold spill batches for merging is total memory minus the
+    // expected output batch size.
+
+    mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+  }
+
+  /**
+   * In a low-memory situation we have to approach the memory assignment
+   * problem from a different angle. Memory is low enough that we can't
+   * fit the incoming batches (of a size decided by the upstream operator)
+   * and our usual spill or merge batch sizes. Instead, we have to
+   * determine the largest spill and merge batch sizes possible given
+   * the available memory, input batch size and row width. We shrink the
+   * sizes of the batches we control to try to make things fit into limited
+   * memory. At some point, however, if we cannot fit even two input
+   * batches and even the smallest merge match, then we will run into an
+   * out-of-memory condition and we log a warning.
+   * <p>
+   * Note that these calculations are a bit crazy: it is Drill that
+   * decided to allocate the small memory, it is Drill that created the
+   * large incoming batches, and so it is Drill that created the low
+   * memory situation. Over time, a better fix for this condition is to
+   * control memory usage at the query level so that the sort is guaranteed
+   * to have sufficient memory. But, since we don't yet have the luxury
+   * of making such changes, we just live with the situation as we find
+   * it.
+   */
+
+  private void adjustForLowMemory() {
+
+    long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
+    long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
+    if (loadHeadroom >= 0  &&  mergeHeadroom >= 0) {
+      return;
+    }
+
+    lowMemorySpillBatchSize();
+    lowMemoryMergeBatchSize();
+
+    // Sanity check: if we've been given too little memory to make progress,
+    // issue a warning but proceed anyway. Should only occur if something is
+    // configured terribly wrong.
+
+    long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
+    if (minNeeds > memoryLimit) {
+      ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " +
+          "Minimum needed = {} bytes, actual available = {} bytes",
+          minNeeds, memoryLimit);
+      bufferMemoryLimit = 0;
+      potentialOverflow = true;
+    }
+
+    // Sanity check
+
+    minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
+    if (minNeeds > memoryLimit) {
+      ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " +
+          "Minimum needed = {} bytes, actual available = {} bytes",
+          minNeeds, memoryLimit);
+      mergeMemoryLimit = 0;
+      potentialOverflow = true;
+    }
+  }
+
+  /**
+   * If we are in a low-memory condition, then we might not have room for the
+   * default spill batch size. In that case, pick a smaller size based on
+   * the observation that we need two input batches and
+   * one spill batch to make progress.
+   */
+
+  private void lowMemorySpillBatchSize() {
+
+    // The "expected" size is with power-of-two rounding in some vectors.
+    // We later work backwards to the row count assuming average internal
+    // fragmentation.
+
+    // Must hold two input batches. Use (most of) the rest for the spill batch.
+
+    expectedSpillBatchSize = (int) (memoryLimit - 2 * estimatedInputSize);
+
+    // But, in the merge phase, we need two spill batches and one output batch.
+    // (Assume that the spill and merge are equal sizes.)
+    // Use 3/4 of memory for each batch (to allow power-of-two rounding:
+
+    expectedSpillBatchSize = (int) Math.min(expectedSpillBatchSize, memoryLimit/3);
+
+    // Never going to happen, but let's ensure we don't somehow create large batches.
+
+    expectedSpillBatchSize = Math.max(expectedSpillBatchSize, SortConfig.MIN_SPILL_BATCH_SIZE);
+
+    // Must hold at least one row to spill. That is, we can make progress if we
+    // create spill files that consist of single-record batches.
+
+    expectedSpillBatchSize = Math.max(expectedSpillBatchSize, estimatedRowWidth);
+
+    // Work out the spill batch count needed by the spill code. Allow room for
+    // power-of-two rounding.
+
+    spillBatchRowCount = rowsPerBatch(expectedSpillBatchSize);
+
+    // Finally, figure out when we must spill.
+
+    bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+  }
+
+  /**
+   * For merge batch, we must hold at least two spill batches and
+   * one output batch.
+   */
+
+  private void lowMemoryMergeBatchSize() {
+    expectedMergeBatchSize = (int) (memoryLimit - 2 * expectedSpillBatchSize);
+    expectedMergeBatchSize = Math.max(expectedMergeBatchSize, SortConfig.MIN_MERGE_BATCH_SIZE);
+    expectedMergeBatchSize = Math.max(expectedMergeBatchSize, estimatedRowWidth);
+    mergeBatchRowCount = rowsPerBatch(expectedMergeBatchSize);
+    mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+  }
+
+  /**
+   * Log the calculated values. Turn this on if things seem amiss.
+   * Message will appear only when the values change.
+   */
+
+  private void logSettings(int actualRecordCount) {
+
+    ExternalSortBatch.logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
+                 estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
+    ExternalSortBatch.logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
+                 expectedSpillBatchSize, spillBatchRowCount, config.spillFileSize());
+    ExternalSortBatch.logger.debug("Output batch size = {} bytes, {} records",
+                 expectedMergeBatchSize, mergeBatchRowCount);
+    ExternalSortBatch.logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
+                 memoryLimit, bufferMemoryLimit, mergeMemoryLimit);
+  }
+
+  public enum MergeAction { SPILL, MERGE, NONE }
+
+  public static class MergeTask {
+    public MergeAction action;
+    public int count;
+
+    public MergeTask(MergeAction action, int count) {
+      this.action = action;
+      this.count = count;
+    }
+  }
+
+  public MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount) {
+
+    // Determine additional memory needed to hold one batch from each
+    // spilled run.
+
+    // If the on-disk batches and in-memory batches need more memory than
+    // is available, spill some in-memory batches.
+
+    if (inMemCount > 0) {
+      long mergeSize = spilledRunsCount * expectedSpillBatchSize;
+      if (allocMemory + mergeSize > mergeMemoryLimit) {
+        return new MergeTask(MergeAction.SPILL, 0);
+      }
+    }
+
+    // Maximum batches that fit into available memory.
+
+    int mergeLimit = (int) ((mergeMemoryLimit - allocMemory) / expectedSpillBatchSize);
+
+    // Can't merge more than the merge limit.
+
+    mergeLimit = Math.min(mergeLimit, config.mergeLimit());
+
+    // How many batches to merge?
+
+    int mergeCount = spilledRunsCount - mergeLimit;
+    if (mergeCount <= 0) {
+      return new MergeTask(MergeAction.NONE, 0);
+    }
+
+    // We will merge. This will create yet another spilled
+    // run. Account for that.
+
+    mergeCount += 1;
+
+    // Must merge at least 2 batches to make progress.
+    // This is the the (at least one) excess plus the allowance
+    // above for the new one.
+
+    // Can't merge more than the limit.
+
+    mergeCount = Math.min(mergeCount, config.mergeLimit());
+
+    // Do the merge, then loop to try again in case not
+    // all the target batches spilled in one go.
+
+    return new MergeTask(MergeAction.MERGE, mergeCount);
+  }
+
+  /**
+   * Compute the number of rows per batch assuming that the batch is
+   * subject to average internal fragmentation due to power-of-two
+   * rounding on vectors.
+   * <p>
+   * <pre>[____|__$__]</pre>
+   * In the above, the brackets represent the whole vector. The
+   * first half is always full. When the first half filled, the second
+   * half was allocated. On average, the second half will be half full.
+   *
+   * @param batchSize expected batch size, including internal fragmentation
+   * @return number of rows that fit into the batch
+   */
+
+  private int rowsPerBatch(int batchSize) {
+    int rowCount = batchSize * 3 / 4 / estimatedRowWidth;
+    return Math.max(1, Math.min(rowCount, Character.MAX_VALUE));
+  }
+
+  /**
+   * Compute the expected number of rows that fit into a given size
+   * batch, accounting for internal fragmentation due to power-of-two
+   * rounding on vector allocations.
+   *
+   * @param rowCount the desired number of rows in the batch
+   * @return the size of resulting batch, including power-of-two
+   * rounding.
+   */
+
+  private int batchForRows(int rowCount) {
+    return estimatedRowWidth * rowCount * 4 / 3;
+  }
+
+  // Must spill if we are below the spill point (the amount of memory
+  // needed to do the minimal spill.)
+
+  public boolean isSpillNeeded(long allocatedBytes, int incomingSize) {
+    return allocatedBytes + incomingSize >= bufferMemoryLimit;
+  }
+
+  public boolean hasMemoryMergeCapacity(long allocatedBytes, long neededForInMemorySort) {
+    return (freeMemory(allocatedBytes) >= neededForInMemorySort);
+  }
+
+  public long freeMemory(long allocatedBytes) {
+    return memoryLimit - allocatedBytes;
+  }
+
+  public long getMergeMemoryLimit() { return mergeMemoryLimit; }
+  public int getSpillBatchRowCount() { return spillBatchRowCount; }
+  public int getMergeBatchRowCount() { return mergeBatchRowCount; }
+
+  // Primarily for testing
+
+  @VisibleForTesting
+  public long getMemoryLimit() { return memoryLimit; }
+  @VisibleForTesting
+  public int getRowWidth() { return estimatedRowWidth; }
+  @VisibleForTesting
+  public int getInputBatchSize() { return estimatedInputBatchSize; }
+  @VisibleForTesting
+  public int getPreferredSpillBatchSize() { return preferredSpillBatchSize; }
+  @VisibleForTesting
+  public int getPreferredMergeBatchSize() { return preferredMergeBatchSize; }
+  @VisibleForTesting
+  public int getSpillBatchSize() { return expectedSpillBatchSize; }
+  @VisibleForTesting
+  public int getMergeBatchSize() { return expectedMergeBatchSize; }
+  @VisibleForTesting
+  public long getBufferMemoryLimit() { return bufferMemoryLimit; }
+  @VisibleForTesting
+  public boolean mayOverflow() { return potentialOverflow; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
new file mode 100644
index 0000000..d51e007
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.exec.ops.OperatorStatReceiver;
+
+public class SortMetrics {
+
+  private int peakBatchCount = -1;
+  private int inputRecordCount = 0;
+  private int inputBatchCount = 0; // total number of batches received so far
+
+  /**
+   * Sum of the total number of bytes read from upstream.
+   * This is the raw memory bytes, not actual data bytes.
+   */
+
+  private long totalInputBytes;
+
+  /**
+   * Tracks the minimum amount of remaining memory for use
+   * in populating an operator metric.
+   */
+
+  private long minimumBufferSpace;
+  private OperatorStatReceiver stats;
+  private int spillCount;
+  private int mergeCount;
+  private long writeBytes;
+
+  public SortMetrics(OperatorStatReceiver stats) {
+    this.stats = stats;
+  }
+
+  public void updateInputMetrics(int rowCount, int batchSize) {
+    inputRecordCount += rowCount;
+    inputBatchCount++;
+    totalInputBytes += batchSize;
+  }
+
+  public void updateMemory(long freeMem) {
+
+    if (minimumBufferSpace == 0) {
+      minimumBufferSpace = freeMem;
+    } else {
+      minimumBufferSpace = Math.min(minimumBufferSpace, freeMem);
+    }
+    stats.setLongStat(ExternalSortBatch.Metric.MIN_BUFFER, minimumBufferSpace);
+  }
+
+  public int getInputRowCount() { return inputRecordCount; }
+  public long getInputBatchCount() { return inputBatchCount; }
+  public long getInputBytes() { return totalInputBytes; }
+
+  public void updatePeakBatches(int bufferedBatchCount) {
+    if (peakBatchCount < bufferedBatchCount) {
+      peakBatchCount = bufferedBatchCount;
+      stats.setLongStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY, peakBatchCount);
+    }
+  }
+
+  public void incrMergeCount() {
+    stats.addLongStat(ExternalSortBatch.Metric.MERGE_COUNT, 1);
+    mergeCount++;
+  }
+
+  public void incrSpillCount() {
+    stats.addLongStat(ExternalSortBatch.Metric.SPILL_COUNT, 1);
+    spillCount++;
+  }
+
+  public void updateWriteBytes(long writeBytes) {
+    stats.setDoubleStat(ExternalSortBatch.Metric.SPILL_MB,
+        writeBytes / 1024.0D / 1024.0);
+    this.writeBytes = writeBytes;
+  }
+
+  public int getSpillCount() { return spillCount; }
+  public int getMergeCount() { return mergeCount; }
+  public long getWriteBytes() { return writeBytes; }
+  public int getPeakBatchCount() { return peakBatchCount; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
new file mode 100644
index 0000000..4231cf4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+/**
+ * Single-batch sorter using a generated implementation based on the
+ * schema and sort specification. The generated sorter is reused
+ * across batches. The sorter must be closed at each schema change
+ * so that the sorter will generate a new implementation against
+ * the changed schema.
+ */
+
+public class SorterWrapper extends BaseSortWrapper {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SorterWrapper.class);
+
+  /**
+   * Generated sort operation used to sort each incoming batch according to
+   * the sort criteria specified in the {@link ExternalSort} definition of
+   * this operator.
+   */
+
+  private SingleBatchSorter sorter;
+
+  public SorterWrapper(OperExecContext opContext) {
+    super(opContext);
+  }
+
+  public void sortBatch(VectorContainer convertedBatch, SelectionVector2 sv2) {
+
+    SingleBatchSorter sorter = getSorter(convertedBatch);
+    try {
+      sorter.setup(context, sv2, convertedBatch);
+      sorter.sort(sv2);
+    } catch (SchemaChangeException e) {
+      convertedBatch.clear();
+      throw UserException.unsupportedError(e)
+            .message("Unexpected schema change.")
+            .build(logger);
+    }
+  }
+
+  public void close() {
+    sorter = null;
+  }
+
+  private SingleBatchSorter getSorter(VectorAccessible batch) {
+    if (sorter == null) {
+      sorter = newSorter(batch);
+    }
+    return sorter;
+  }
+
+  private SingleBatchSorter newSorter(VectorAccessible batch) {
+    CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
+        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(),
+        context.getOptionSet());
+    ClassGenerator<SingleBatchSorter> g = cg.getRoot();
+    cg.plainJavaCapable(true);
+    // Uncomment out this line to debug the generated code.
+  cg.saveCodeForDebugging(true);
+
+    generateComparisons(g, batch, logger);
+    return getInstance(cg, logger);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
new file mode 100644
index 0000000..a6042c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Represents the set of spilled batches, including methods to spill and/or
+ * merge a set of batches to produce a new spill file.
+ */
+
+public class SpilledRuns {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpilledRuns.class);
+
+  /**
+   * Manages the set of spill directories and files.
+   */
+
+  private final SpillSet spillSet;
+  private final LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
+
+  /**
+   * Manages the copier used to merge a collection of batches into
+   * a new set of batches.
+   */
+
+  private final PriorityQueueCopierWrapper copierHolder;
+  private BatchSchema schema;
+
+  private final OperExecContext context;
+
+  public SpilledRuns(OperExecContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper copier) {
+    this.context = opContext;
+    this.spillSet = spillSet;
+//    copierHolder = new PriorityQueueCopierWrapper(opContext);
+    copierHolder = copier;
+  }
+
+  public void setSchema(BatchSchema schema) {
+    this.schema = schema;
+    for (BatchGroup b : spilledRuns) {
+      b.setSchema(schema);
+    }
+    copierHolder.close();
+  }
+
+  public int size() { return spilledRuns.size(); }
+  public boolean hasSpilled() { return spillSet.hasSpilled(); }
+  public long getWriteBytes() { return spillSet.getWriteBytes(); }
+
+  public static List<BatchGroup> prepareSpillBatches(LinkedList<? extends BatchGroup> source, int spillCount) {
+    List<BatchGroup> batchesToSpill = Lists.newArrayList();
+    spillCount = Math.min(source.size(), spillCount);
+    assert spillCount > 0 : "Spill count to mergeAndSpill must not be zero";
+    for (int i = 0; i < spillCount; i++) {
+      batchesToSpill.add(source.pollFirst());
+    }
+    return batchesToSpill;
+  }
+
+  public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount) {
+    spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount));
+    logger.trace("Completed spill: memory = {}",
+        context.getAllocator().getAllocatedMemory());
+  }
+
+  public void mergeRuns(int targetCount, long mergeMemoryPool, int spillBatchRowCount) {
+
+    long allocated = context.getAllocator().getAllocatedMemory();
+    mergeMemoryPool -= context.getAllocator().getAllocatedMemory();
+    logger.trace("Merging {} on-disk runs, alloc. memory = {}, avail. memory = {}",
+        targetCount, allocated, mergeMemoryPool);
+
+    // Determine the number of runs to merge. The count should be the
+    // target count. However, to prevent possible memory overrun, we
+    // double-check with actual spill batch size and only spill as much
+    // as fits in the merge memory pool.
+
+    int mergeCount = 0;
+    long mergeSize = 0;
+    for (SpilledRun run : spilledRuns) {
+      long batchSize = run.getBatchSize();
+      if (mergeSize + batchSize > mergeMemoryPool) {
+        break;
+      }
+      mergeSize += batchSize;
+      mergeCount++;
+      if (mergeCount == targetCount) {
+        break;
+      }
+    }
+
+    // Must always spill at least 2, even if this creates an over-size
+    // spill file. But, if this is a final consolidation, we may have only
+    // a single batch.
+
+    mergeCount = Math.max(mergeCount, 2);
+    mergeCount = Math.min(mergeCount, spilledRuns.size());
+
+    // Do the actual spill.
+
+    List<BatchGroup> batchesToSpill = prepareSpillBatches(spilledRuns, mergeCount);
+    mergeAndSpill(batchesToSpill, spillBatchRowCount);
+  }
+
+  private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) {
+    try {
+      return doMergeAndSpill(batchesToSpill, spillBatchRowCount);
+    }
+    // If error is a User Exception, just use as is.
+
+    catch (UserException ue) { throw ue; }
+    catch (Throwable ex) {
+      throw UserException.resourceError(ex)
+            .message("External Sort encountered an error while spilling to disk")
+            .build(logger);
+    }
+  }
+
+  private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) throws Throwable {
+
+    // Merge the selected set of matches and write them to the
+    // spill file. After each write, we release the memory associated
+    // with the just-written batch.
+
+    String outputFile = spillSet.getNextSpillFile();
+    BatchGroup.SpilledRun newGroup = null;
+    VectorContainer dest = new VectorContainer();
+    try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
+         PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, dest, spillBatchRowCount)) {
+      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, context.getAllocator());
+      logger.trace("Spilling {} batches, into spill batches of {} rows, to {}",
+          batchesToSpill.size(), spillBatchRowCount, outputFile);
+
+      // The copier will merge records from the buffered batches into
+      // the outputContainer up to targetRecordCount number of rows.
+      // The actual count may be less if fewer records are available.
+
+      while (merger.next()) {
+
+        // Add a new batch of records (given by merger.getOutput()) to the spill
+        // file.
+        //
+        // note that addBatch also clears the merger's output container
+
+        newGroup.addBatch(dest);
+      }
+      context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class);
+      newGroup.closeOutputStream();
+      logger.trace("Spilled {} output batches, each of {} by bytes, {} records to {}",
+                   merger.getBatchCount(), merger.getRecordCount(),
+                   merger.getEstBatchSize(), outputFile);
+      newGroup.setBatchSize(merger.getEstBatchSize());
+      return newGroup;
+    } catch (Throwable e) {
+      // we only need to clean up newGroup if spill failed
+      try {
+        if (newGroup != null) {
+          AutoCloseables.close(e, newGroup);
+        }
+      } catch (Throwable t) { /* close() may hit the same IO issue; just ignore */ }
+
+      throw e;
+    }
+  }
+
+  public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, VectorContainer container, int mergeRowCount) {
+    List<BatchGroup> allBatches = new LinkedList<>();
+    allBatches.addAll(bufferedBatches);
+    bufferedBatches.clear();
+    allBatches.addAll(spilledRuns);
+    spilledRuns.clear();
+    logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
+        allBatches.size(), context.getAllocator().getAllocatedMemory());
+    return copierHolder.startMerge(schema, allBatches, container, mergeRowCount);
+  }
+
+  public void close() {
+    if (spillSet.getWriteBytes() > 0) {
+      logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}",
+                   spillSet.getWriteBytes(), spillSet.getWriteBytes());
+    }
+    RuntimeException ex = null;
+    try {
+      if (spilledRuns != null) {
+        BatchGroup.closeAll(spilledRuns);
+        spilledRuns.clear();
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      copierHolder.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      spillSet.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8aedaf6..146df1f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -34,11 +34,11 @@ drill {
     annotations += org.apache.drill.exec.expr.annotations.FunctionTemplate
 
     packages : ${?drill.classpath.scanning.packages} [
-      org.apache.drill.exec.expr,
-      org.apache.drill.exec.physical,
-      org.apache.drill.exec.store,
-      org.apache.drill.exec.rpc.user.security,
-      org.apache.drill.exec.rpc.security
+          org.apache.drill.exec.expr,
+          org.apache.drill.exec.physical,
+          org.apache.drill.exec.store,
+          org.apache.drill.exec.rpc.user.security,
+          org.apache.drill.exec.rpc.security
     ]
   }
 }
@@ -241,19 +241,19 @@ drill.exec: {
       // Set this to true to use the legacy, unmanaged version.
       // Disabled in the intial commit, to be enabled after
       // tests are committed.
-      disable_managed: true
+      disable_managed: true,
       // Limit on the number of batches buffered in memory.
       // Primarily for testing.
       // 0 = unlimited
-      batch_limit: 0
+      batch_limit: 0,
       // Limit on the amount of memory used for xsort. Overrides the
       // value provided by Foreman. Primarily for testing.
       // 0 = unlimited, Supports HOCON memory suffixes.
-      mem_limit: 0
+      mem_limit: 0,
       // Limit on the number of spilled batches that can be merged in
       // a single pass. Limits the number of open file handles.
       // 0 = unlimited
-      merge_limit: 0
+      merge_limit: 0,
       spill: {
         // Deprecated for managed xsort; used only by legacy xsort
         group.size: 40000,