You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:12 UTC

[4/5] drill git commit: DRILL-5325: Unit tests for the managed sort

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 4d5f290..1dbddee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -17,47 +17,27 @@
  */
 package org.apache.drill.exec.physical.impl.xsort.managed;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.ops.OperExecContextImpl;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
-import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
-import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
-import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
-import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
-
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
-import com.google.common.collect.Lists;
-
 /**
  * External sort batch: a sort batch which can spill to disk in
  * order to operate within a defined memory footprint.
@@ -175,191 +155,33 @@ import com.google.common.collect.Lists;
  */
 
 public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
   protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
 
-  /**
-   * Smallest allowed output batch size. The smallest output batch
-   * created even under constrained memory conditions.
-   */
-  private static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
-
-  /**
-   * In the bizarre case where the user gave us an unrealistically low
-   * spill file size, set a floor at some bare minimum size. (Note that,
-   * at this size, big queries will create a huge number of files, which
-   * is why the configuration default is one the order of hundreds of MB.)
-   */
-
-  private static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024;
-
   public static final String INTERRUPTION_AFTER_SORT = "after-sort";
   public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
   public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
   public static final String INTERRUPTION_WHILE_MERGING = "merging";
-  public static final long DEFAULT_SPILL_BATCH_SIZE = 8L * 1024 * 1024;
-  public static final long MIN_SPILL_BATCH_SIZE = 256 * 1024;
 
   private final RecordBatch incoming;
 
   /**
-   * Memory allocator for this operator itself. Incoming batches are
-   * transferred into this allocator. Intermediate batches used during
-   * merge also reside here.
-   */
-
-  private final BufferAllocator allocator;
-
-  /**
    * Schema of batches that this operator produces.
    */
 
   private BatchSchema schema;
 
-  /**
-   * Incoming batches buffered in memory prior to spilling
-   * or an in-memory merge.
-   */
-
-  private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
-  private LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
-  private SelectionVector4 sv4;
-
-  /**
-   * The number of records to add to each output batch sent to the
-   * downstream operator or spilled to disk.
-   */
-
-  private int mergeBatchRowCount;
-  private int peakNumBatches = -1;
-
-  /**
-   * Maximum memory this operator may use. Usually comes from the
-   * operator definition, but may be overridden by a configuration
-   * parameter for unit testing.
-   */
-
-  private long memoryLimit;
+//  private SelectionVector4 sv4;
 
   /**
    * Iterates over the final, sorted results.
    */
 
   private SortResults resultsIterator;
-
-  /**
-   * Manages the set of spill directories and files.
-   */
-
-  private final SpillSet spillSet;
-
-  /**
-   * Manages the copier used to merge a collection of batches into
-   * a new set of batches.
-   */
-
-  private final CopierHolder copierHolder;
-
   private enum SortState { START, LOAD, DELIVER, DONE }
   private SortState sortState = SortState.START;
-  private int inputRecordCount = 0;
-  private int inputBatchCount = 0; // total number of batches received so far
-  private final OperatorCodeGenerator opCodeGen;
-
-  /**
-   * Estimated size of the records for this query, updated on each
-   * new batch received from upstream.
-   */
-
-  private int estimatedRowWidth;
-
-  /**
-   * Size of the merge batches that this operator produces. Generally
-   * the same as the merge batch size, unless low memory forces a smaller
-   * value.
-   */
-
-  private long targetMergeBatchSize;
-
-  /**
-   * Estimate of the input batch size based on the largest batch seen
-   * thus far.
-   */
-  private long estimatedInputBatchSize;
-
-  /**
-   * Maximum number of spilled runs that can be merged in a single pass.
-   */
-
-  private int mergeLimit;
-
-  /**
-   * Target size of the first-generation spill files.
-   */
-  private long spillFileSize;
-
-  /**
-   * Tracks the minimum amount of remaining memory for use
-   * in populating an operator metric.
-   */
-
-  private long minimumBufferSpace;
-
-  /**
-   * Maximum memory level before spilling occurs. That is, we can buffer input
-   * batches in memory until we reach the level given by the buffer memory pool.
-   */
-
-  private long bufferMemoryPool;
 
-  /**
-   * Maximum memory that can hold batches during the merge
-   * phase.
-   */
-
-  private long mergeMemoryPool;
-
-  /**
-   * The target size for merge batches sent downstream.
-   */
-
-  private long preferredMergeBatchSize;
-
-  /**
-   * Sum of the total number of bytes read from upstream.
-   * This is the raw memory bytes, not actual data bytes.
-   */
-
-  private long totalInputBytes;
-
-  /**
-   * The configured size for each spill batch.
-   */
-  private Long preferredSpillBatchSize;
-
-  /**
-   * Tracks the maximum density of input batches. Density is
-   * the amount of actual data / amount of memory consumed.
-   * Low density batches indicate an EOF or something wrong in
-   * an upstream operator because a low-density batch wastes
-   * memory.
-   */
-
-  private int maxDensity;
-  private int lastDensity = -1;
-
-  /**
-   * Estimated number of rows that fit into a single spill batch.
-   */
-
-  private int spillBatchRowCount;
-
-  /**
-   * The estimated actual spill batch size which depends on the
-   * details of the data rows for any particular query.
-   */
-
-  private int targetSpillBatchSize;
+  private SortImpl sortImpl;
 
   // WARNING: The enum here is used within this class. But, the members of
   // this enum MUST match those in the (unmanaged) ExternalSortBatch since
@@ -367,7 +189,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   public enum Metric implements MetricDef {
     SPILL_COUNT,            // number of times operator spilled to disk
-    RETIRED1,               // Was: peak value for totalSizeInMemory
+    NOT_USED,               // Was: peak value for totalSizeInMemory
                             // But operator already provides this value
     PEAK_BATCHES_IN_MEMORY, // maximum number of batches kept in memory
     MERGE_COUNT,            // Number of second+ generation merges
@@ -382,126 +204,33 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
   }
 
-  /**
-   * Iterates over the final sorted results. Implemented differently
-   * depending on whether the results are in-memory or spilled to
-   * disk.
-   */
-
-  public interface SortResults {
-    boolean next();
-    void close();
-    int getBatchCount();
-    int getRecordCount();
-  }
-
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
     super(popConfig, context, true);
     this.incoming = incoming;
-    allocator = oContext.getAllocator();
-    opCodeGen = new OperatorCodeGenerator(context, popConfig);
-
-    spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT);
-    copierHolder = new CopierHolder(context, allocator, opCodeGen);
-    configure(context.getConfig());
-  }
-
-  private void configure(DrillConfig config) {
-
-    // The maximum memory this operator can use as set by the
-    // operator definition (propagated to the allocator.)
-
-    memoryLimit = allocator.getLimit();
-
-    // Optional configured memory limit, typically used only for testing.
-
-    long configLimit = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
-    if (configLimit > 0) {
-      memoryLimit = Math.min(memoryLimit, configLimit);
-    }
-
-    // Optional limit on the number of spilled runs to merge in a single
-    // pass. Limits the number of open file handles. Must allow at least
-    // two batches to merge to make progress.
-
-    mergeLimit = getConfigLimit(config, ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, Integer.MAX_VALUE, 2);
-
-    // Limits the size of first-generation spill files.
-
-    spillFileSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE);
-
-    // Ensure the size is reasonable.
-
-    spillFileSize = Math.max(spillFileSize, MIN_SPILL_FILE_SIZE);
 
-    // The spill batch size. This is a critical setting for performance.
-    // Set too large and the ratio between memory and input data sizes becomes
-    // small. Set too small and disk seek times dominate performance.
+    SortConfig sortConfig = new SortConfig(context.getConfig());
+    SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(),
+                                     popConfig);
+    OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
+    PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
+    SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
+    sortImpl = new SortImpl(opContext, sortConfig, spilledRuns, container);
 
-    preferredSpillBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE);
+    // The upstream operator checks on record count before we have
+    // results. Create an empty result set temporarily to handle
+    // these calls.
 
-    // In low memory, use no more than 1/4 of memory for each spill batch. Ensures we
-    // can merge.
-
-    preferredSpillBatchSize = Math.min(preferredSpillBatchSize, memoryLimit / 4);
-
-    // But, the spill batch should be above some minimum size to prevent complete
-    // thrashing.
-
-    preferredSpillBatchSize = Math.max(preferredSpillBatchSize, MIN_SPILL_BATCH_SIZE);
-
-    // Set the target output batch size. Use the maximum size, but only if
-    // this represents less than 10% of available memory. Otherwise, use 10%
-    // of memory, but no smaller than the minimum size. In any event, an
-    // output batch can contain no fewer than a single record.
-
-    preferredMergeBatchSize = config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE);
-    long maxAllowance = (long) (memoryLimit - 2 * preferredSpillBatchSize);
-    preferredMergeBatchSize = Math.min(maxAllowance, preferredMergeBatchSize);
-    preferredMergeBatchSize = Math.max(preferredMergeBatchSize, MIN_MERGED_BATCH_SIZE);
-
-    logger.debug("Config: memory limit = {}, " +
-                 "spill file size = {}, spill batch size = {}, merge limit = {}, merge batch size = {}",
-                  memoryLimit, spillFileSize, preferredSpillBatchSize, mergeLimit,
-                  preferredMergeBatchSize);
-  }
-
-  private int getConfigLimit(DrillConfig config, String paramName, int valueIfZero, int minValue) {
-    int limit = config.getInt(paramName);
-    if (limit > 0) {
-      limit = Math.max(limit, minValue);
-    } else {
-      limit = valueIfZero;
-    }
-    return limit;
+    resultsIterator = new SortImpl.EmptyResults(container);
   }
 
   @Override
   public int getRecordCount() {
-    if (sv4 != null) {
-      return sv4.getCount();
-    }
-    return container.getRecordCount();
+    return resultsIterator.getRecordCount();
   }
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    return sv4;
-  }
-
-  private void closeBatchGroups(Collection<? extends BatchGroup> groups) {
-    for (BatchGroup group: groups) {
-      try {
-        group.close();
-      } catch (Exception e) {
-        // collect all failure and make sure to cleanup all remaining batches
-        // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources()
-        // where it would have been passed to context.fail()
-        // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping
-        // right away, this will also make sure we collect any additional exception we may get while cleaning up
-        context.fail(e);
-      }
-    }
+    return resultsIterator.getSv4();
   }
 
   /**
@@ -588,59 +317,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   }
 
   /**
-   * Load and process a single batch, handling schema changes. In general, the
-   * external sort accepts only one schema.
-   *
-   * @return return code depending on the amount of data read from upstream
-   */
-
-  private IterOutcome loadBatch() {
-
-    // If this is the very first batch, then AbstractRecordBatch
-    // already loaded it for us in buildSchema().
-
-    IterOutcome upstream;
-    if (sortState == SortState.START) {
-      sortState = SortState.LOAD;
-      upstream = IterOutcome.OK_NEW_SCHEMA;
-    } else {
-      upstream = next(incoming);
-    }
-    switch (upstream) {
-    case NONE:
-    case STOP:
-      return upstream;
-    case OK_NEW_SCHEMA:
-    case OK:
-      setupSchema(upstream);
-
-      // Add the batch to the in-memory generation, spilling if
-      // needed.
-
-      processBatch();
-      break;
-    case OUT_OF_MEMORY:
-
-      // Note: it is highly doubtful that this code actually works. It
-      // requires that the upstream batches got to a safe place to run
-      // out of memory and that no work as in-flight and thus abandoned.
-      // Consider removing this case once resource management is in place.
-
-      logger.error("received OUT_OF_MEMORY, trying to spill");
-      if (bufferedBatches.size() > 2) {
-        spillFromMemory();
-      } else {
-        logger.error("not enough batches to spill, sending OUT_OF_MEMORY downstream");
-        return IterOutcome.OUT_OF_MEMORY;
-      }
-      break;
-    default:
-      throw new IllegalStateException("Unexpected iter outcome: " + upstream);
-    }
-    return IterOutcome.OK;
-  }
-
-  /**
    * Load the results and sort them. May bail out early if an exceptional
    * condition is passed up from the input batch.
    *
@@ -674,50 +350,74 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     // Anything to actually sort?
 
-    if (inputRecordCount == 0) {
+    resultsIterator = sortImpl.startMerge();
+    if (! resultsIterator.next()) {
       sortState = SortState.DONE;
       return IterOutcome.NONE;
     }
-    logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}",
-                 inputBatchCount, spilledRuns.size(), totalInputBytes);
 
-    // Do the merge of the loaded batches. The merge can be done entirely in memory if
-    // the results fit; else we have to do a disk-based merge of
-    // pre-sorted spilled batches.
+    // sort may have prematurely exited due to should continue returning false.
 
-    if (canUseMemoryMerge()) {
-      return sortInMemory();
-    } else {
-      return mergeSpilledRuns();
+    if (! context.shouldContinue()) {
+      sortState = SortState.DONE;
+      return IterOutcome.STOP;
     }
+
+    sortState = SortState.DELIVER;
+    return IterOutcome.OK_NEW_SCHEMA;
   }
 
   /**
-   * All data has been read from the upstream batch. Determine if we
-   * can use a fast in-memory sort, or must use a merge (which typically,
-   * but not always, involves spilled batches.)
+   * Load and process a single batch, handling schema changes. In general, the
+   * external sort accepts only one schema.
    *
-   * @return whether sufficient resources exist to do an in-memory sort
-   * if all batches are still in memory
+   * @return return code depending on the amount of data read from upstream
    */
 
-  private boolean canUseMemoryMerge() {
-    if (spillSet.hasSpilled()) { return false; }
+  private IterOutcome loadBatch() {
 
-    // Do we have enough memory for MSorter (the in-memory sorter)?
+    // If this is the very first batch, then AbstractRecordBatch
+    // already loaded it for us in buildSchema().
 
-    long allocMem = allocator.getAllocatedMemory();
-    long availableMem = memoryLimit - allocMem;
-    long neededForInMemorySort = MSortTemplate.memoryNeeded(inputRecordCount);
-    if (availableMem < neededForInMemorySort) { return false; }
+    IterOutcome upstream;
+    if (sortState == SortState.START) {
+      sortState = SortState.LOAD;
+      upstream = IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      upstream = next(incoming);
+    }
+    switch (upstream) {
+    case NONE:
+    case STOP:
+      return upstream;
+    case OK_NEW_SCHEMA:
+      setupSchema();
+      // Fall through
 
-    // Make sure we don't exceed the maximum number of batches SV4 can address.
+    case OK:
 
-    if (bufferedBatches.size() > Character.MAX_VALUE) { return false; }
+      // Add the batch to the in-memory generation, spilling if
+      // needed.
 
-    // We can do an in-memory merge.
+      sortImpl.addBatch(incoming);
+      break;
+    case OUT_OF_MEMORY:
+
+      // Note: it is highly doubtful that this code actually works. It
+      // requires that the upstream batches got to a safe place to run
+      // out of memory and that no work was in-flight and thus abandoned.
+      // Consider removing this case once resource management is in place.
 
-    return true;
+      logger.error("received OUT_OF_MEMORY, trying to spill");
+      if (! sortImpl.forceSpill()) {
+        throw UserException.memoryError("Received OUT_OF_MEMORY, but enough batches to spill")
+          .build(logger);
+      }
+      break;
+    default:
+      throw new IllegalStateException("Unexpected iter outcome: " + upstream);
+    }
+    return IterOutcome.OK;
   }
 
   /**
@@ -727,749 +427,24 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
    */
 
-  private void setupSchema(IterOutcome upstream)  {
+  private void setupSchema()  {
 
     // First batch: we won't have a schema.
 
     if (schema == null) {
       schema = incoming.getSchema();
-
-    // Subsequent batches, nothing to do if same schema.
-
-    } else if (upstream == IterOutcome.OK) {
-      return;
-
-    // Only change in the case that the schema truly changes. Artificial schema changes are ignored.
-
     } else if (incoming.getSchema().equals(schema)) {
-      return;
+      // Nothing to do.  Artificial schema changes are ignored.
     } else if (unionTypeEnabled) {
-        schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
-
-        // New schema: must generate a new sorter and copier.
-
-        opCodeGen.setSchema(schema);
+      schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
     } else {
       throw UserException.unsupportedError()
             .message("Schema changes not supported in External Sort. Please enable Union type.")
+            .addContext("Previous schema", schema.toString())
+            .addContext("Incoming schema", incoming.getSchema().toString())
             .build(logger);
     }
-
-    // Coerce all existing batches to the new schema.
-
-    for (BatchGroup b : bufferedBatches) {
-      b.setSchema(schema);
-    }
-    for (BatchGroup b : spilledRuns) {
-      b.setSchema(schema);
-    }
-  }
-
-  /**
-   * Convert an incoming batch into the agree-upon format. (Also seems to
-   * make a persistent shallow copy of the batch saved until we are ready
-   * to sort or spill.)
-   *
-   * @return the converted batch, or null if the incoming batch is empty
-   */
-
-  @SuppressWarnings("resource")
-  private VectorContainer convertBatch() {
-
-    // Must accept the batch even if no records. Then clear
-    // the vectors to release memory since we won't do any
-    // further processing with the empty batch.
-
-    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext);
-    if (incoming.getRecordCount() == 0) {
-      for (VectorWrapper<?> w : convertedBatch) {
-        w.clear();
-      }
-      SelectionVector2 sv2 = incoming.getSelectionVector2();
-      if (sv2 != null) {
-        sv2.clear();
-      }
-      return null;
-    }
-    return convertedBatch;
-  }
-
-  private SelectionVector2 makeSelectionVector() {
-    if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
-      return incoming.getSelectionVector2().clone();
-    } else {
-      return newSV2();
-    }
-  }
-
-  /**
-   * Process the converted incoming batch by adding it to the in-memory store
-   * of data, or spilling data to disk when necessary.
-   */
-
-  @SuppressWarnings("resource")
-  private void processBatch() {
-
-    // Skip empty batches (such as the first one.)
-
-    if (incoming.getRecordCount() == 0) {
-      return;
-    }
-
-    // Determine actual sizes of the incoming batch before taking
-    // ownership. Allows us to figure out if we need to spill first,
-    // to avoid overflowing memory simply due to ownership transfer.
-
-    RecordBatchSizer sizer = analyzeIncomingBatch();
-
-    // The heart of the external sort operator: spill to disk when
-    // the in-memory generation exceeds the allowed memory limit.
-    // Preemptively spill BEFORE accepting the new batch into our memory
-    // pool. The allocator will throw an OOM exception if we accept the
-    // batch when we are near the limit - despite the fact that the batch
-    // is already in memory and no new memory is allocated during the transfer.
-
-    if ( isSpillNeeded(sizer.actualSize())) {
-      spillFromMemory();
-    }
-
-    // Sanity check. We should now be below the buffer memory maximum.
-
-    long startMem = allocator.getAllocatedMemory();
-    if (startMem > bufferMemoryPool) {
-      logger.error( "ERROR: Failed to spill above buffer limit. Buffer pool = {}, memory = {}",
-          bufferMemoryPool, startMem);
-    }
-
-    // Convert the incoming batch to the agreed-upon schema.
-    // No converted batch means we got an empty input batch.
-    // Converting the batch transfers memory ownership to our
-    // allocator. This gives a round-about way to learn the batch
-    // size: check the before and after memory levels, then use
-    // the difference as the batch size, in bytes.
-
-    VectorContainer convertedBatch = convertBatch();
-    if (convertedBatch == null) {
-      return;
-    }
-
-    SelectionVector2 sv2;
-    try {
-      sv2 = makeSelectionVector();
-    } catch (Exception e) {
-      convertedBatch.clear();
-      throw e;
-    }
-
-    // Compute batch size, including allocation of an sv2.
-
-    long endMem = allocator.getAllocatedMemory();
-    long batchSize = endMem - startMem;
-    int count = sv2.getCount();
-    inputRecordCount += count;
-    inputBatchCount++;
-    totalInputBytes += sizer.actualSize();
-
-    // Update the minimum buffer space metric.
-
-    if (minimumBufferSpace == 0) {
-      minimumBufferSpace = endMem;
-    } else {
-      minimumBufferSpace = Math.min(minimumBufferSpace, endMem);
-    }
-    stats.setLongStat(Metric.MIN_BUFFER, minimumBufferSpace);
-
-    // Update the size based on the actual record count, not
-    // the effective count as given by the selection vector
-    // (which may exclude some records due to filtering.)
-
-    updateMemoryEstimates(batchSize, sizer);
-
-    // Sort the incoming batch using either the original selection vector,
-    // or a new one created here.
-
-    SingleBatchSorter sorter;
-    sorter = opCodeGen.getSorter(convertedBatch);
-    try {
-      sorter.setup(context, sv2, convertedBatch);
-    } catch (SchemaChangeException e) {
-      convertedBatch.clear();
-      throw UserException.unsupportedError(e)
-            .message("Unexpected schema change.")
-            .build(logger);
-    }
-    try {
-      sorter.sort(sv2);
-    } catch (SchemaChangeException e) {
-      convertedBatch.clear();
-      throw UserException.unsupportedError(e)
-                .message("Unexpected schema change.")
-                .build(logger);
-    }
-    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
-    try {
-      rbd.setSv2(sv2);
-      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), oContext, sizer.netSize()));
-      if (peakNumBatches < bufferedBatches.size()) {
-        peakNumBatches = bufferedBatches.size();
-        stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches);
-      }
-
-    } catch (Throwable t) {
-      rbd.clear();
-      throw t;
-    }
-  }
-
-  /**
-   * Scan the vectors in the incoming batch to determine batch size and if
-   * any oversize columns exist. (Oversize columns cause memory fragmentation.)
-   *
-   * @return an analysis of the incoming batch
-   */
-
-  private RecordBatchSizer analyzeIncomingBatch() {
-    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
-    sizer.applySv2();
-    if (inputBatchCount == 0) {
-      logger.debug("{}", sizer.toString());
-    }
-    return sizer;
-  }
-
-  /**
-   * Update the data-driven memory use numbers including:
-   * <ul>
-   * <li>The average size of incoming records.</li>
-   * <li>The estimated spill and output batch size.</li>
-   * <li>The estimated number of average-size records per
-   * spill and output batch.</li>
-   * <li>The amount of memory set aside to hold the incoming
-   * batches before spilling starts.</li>
-   * </ul>
-   *
-   * @param actualBatchSize the overall size of the current batch received from
-   * upstream
-   * @param actualRecordCount the number of actual (not filtered) records in
-   * that upstream batch
-   */
-
-  private void updateMemoryEstimates(long memoryDelta, RecordBatchSizer sizer) {
-    long actualBatchSize = sizer.actualSize();
-    int actualRecordCount = sizer.rowCount();
-
-    if (actualBatchSize != memoryDelta) {
-      logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}",
-                   memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
-    }
-
-    // The record count should never be zero, but better safe than sorry...
-
-    if (actualRecordCount == 0) {
-      return; }
-
-    // If the vector is less than 75% full, just ignore it, except in the
-    // unfortunate case where it is the first batch. Low-density batches generally
-    // occur only at the end of a file or at the end of a DFS block. In such a
-    // case, we will continue to rely on estimates created on previous, high-
-    // density batches.
-    // We actually track the max density seen, and compare to 75% of that since
-    // Parquet produces very low density record batches.
-
-    if (sizer.avgDensity() < maxDensity * 3 / 4 && sizer.avgDensity() != lastDensity) {
-      logger.trace("Saw low density batch. Density: {}", sizer.avgDensity());
-      lastDensity = sizer.avgDensity();
-      return;
-    }
-    maxDensity = Math.max(maxDensity, sizer.avgDensity());
-
-    // We know the batch size and number of records. Use that to estimate
-    // the average record size. Since a typical batch has many records,
-    // the average size is a fairly good estimator. Note that the batch
-    // size includes not just the actual vector data, but any unused space
-    // resulting from power-of-two allocation. This means that we don't
-    // have to do size adjustments for input batches as we will do below
-    // when estimating the size of other objects.
-
-    int batchRowWidth = sizer.netRowWidth();
-
-    // Record sizes may vary across batches. To be conservative, use
-    // the largest size observed from incoming batches.
-
-    int origRowEstimate = estimatedRowWidth;
-    estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
-
-    // Maintain an estimate of the incoming batch size: the largest
-    // batch yet seen. Used to reserve memory for the next incoming
-    // batch. Because we are using the actual observed batch size,
-    // the size already includes overhead due to power-of-two rounding.
-
-    long origInputBatchSize = estimatedInputBatchSize;
-    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, actualBatchSize);
-
-    // The row width may end up as zero if all fields are nulls or some
-    // other unusual situation. In this case, assume a width of 10 just
-    // to avoid lots of special case code.
-
-    if (estimatedRowWidth == 0) {
-      estimatedRowWidth = 10;
-    }
-
-    // Go no further if nothing changed.
-
-    if (estimatedRowWidth == origRowEstimate && estimatedInputBatchSize == origInputBatchSize) {
-      return; }
-
-    // Estimate the total size of each incoming batch plus sv2. Note that, due
-    // to power-of-two rounding, the allocated sv2 size might be twice the data size.
-
-    long estimatedInputSize = estimatedInputBatchSize + 4 * actualRecordCount;
-
-    // Determine the number of records to spill per spill batch. The goal is to
-    // spill batches of either 64K records, or as many records as fit into the
-    // amount of memory dedicated to each spill batch, whichever is less.
-
-    spillBatchRowCount = (int) Math.max(1, preferredSpillBatchSize / estimatedRowWidth / 2);
-    spillBatchRowCount = Math.min(spillBatchRowCount, Character.MAX_VALUE);
-
-    // Compute the actual spill batch size which may be larger or smaller
-    // than the preferred size depending on the row width. Double the estimated
-    // memory needs to allow for power-of-two rounding.
-
-    targetSpillBatchSize = spillBatchRowCount * estimatedRowWidth * 2;
-
-    // Determine the number of records per batch per merge step. The goal is to
-    // merge batches of either 64K records, or as many records as fit into the
-    // amount of memory dedicated to each merge batch, whichever is less.
-
-    mergeBatchRowCount = (int) Math.max(1, preferredMergeBatchSize / estimatedRowWidth / 2);
-    mergeBatchRowCount = Math.min(mergeBatchRowCount, Character.MAX_VALUE);
-    mergeBatchRowCount = Math.max(1,  mergeBatchRowCount);
-    targetMergeBatchSize = mergeBatchRowCount * estimatedRowWidth * 2;
-
-    // Determine the minimum memory needed for spilling. Spilling is done just
-    // before accepting a batch, so we must spill if we don't have room for a
-    // (worst case) input batch. To spill, we need room for the output batch created
-    // by merging the batches already in memory. Double this to allow for power-of-two
-    // memory allocations.
-
-    long spillPoint = estimatedInputBatchSize + 2 * targetSpillBatchSize;
-
-    // The merge memory pool assumes we can spill all input batches. To make
-    // progress, we must have at least two merge batches (same size as an output
-    // batch) and one output batch. Again, double to allow for power-of-two
-    // allocation and add one for a margin of error.
-
-    long minMergeMemory = 2 * targetSpillBatchSize + targetMergeBatchSize;
-
-    // If we are in a low-memory condition, then we might not have room for the
-    // default output batch size. In that case, pick a smaller size.
-
-    if (minMergeMemory > memoryLimit) {
-
-      // Figure out the minimum output batch size based on memory,
-      // must hold at least one complete row.
-
-      long mergeAllowance = memoryLimit - 2 * targetSpillBatchSize;
-      targetMergeBatchSize = Math.max(estimatedRowWidth, mergeAllowance / 2);
-      mergeBatchRowCount = (int) (targetMergeBatchSize / estimatedRowWidth / 2);
-      minMergeMemory = 2 * targetSpillBatchSize + targetMergeBatchSize;
-    }
-
-    // Determine the minimum total memory we would need to receive two input
-    // batches (the minimum needed to make progress) and the allowance for the
-    // output batch.
-
-    long minLoadMemory = spillPoint + estimatedInputSize;
-
-    // Determine how much memory can be used to hold in-memory batches of spilled
-    // runs when reading from disk.
-
-    bufferMemoryPool = memoryLimit - spillPoint;
-    mergeMemoryPool = Math.max(memoryLimit - minMergeMemory,
-                               (long) ((memoryLimit - 3 * targetMergeBatchSize) * 0.95));
-
-    // Sanity check: if we've been given too little memory to make progress,
-    // issue a warning but proceed anyway. Should only occur if something is
-    // configured terribly wrong.
-
-    long minMemoryNeeds = Math.max(minLoadMemory, minMergeMemory);
-    if (minMemoryNeeds > memoryLimit) {
-      logger.warn("Potential memory overflow! " +
-                   "Minumum needed = {} bytes, actual available = {} bytes",
-                   minMemoryNeeds, memoryLimit);
-    }
-
-    // Log the calculated values. Turn this on if things seem amiss.
-    // Message will appear only when the values change.
-
-    logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
-                 estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
-    logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
-                 targetSpillBatchSize, spillBatchRowCount, spillFileSize);
-    logger.debug("Output batch size = {} bytes, {} records",
-                 targetMergeBatchSize, mergeBatchRowCount);
-    logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
-                 memoryLimit, bufferMemoryPool, mergeMemoryPool);
-  }
-
-  /**
-   * Determine if spill is needed before receiving the new record batch.
-   * Spilling is driven purely by memory availability (and an optional
-   * batch limit for testing.)
-   *
-   * @return true if spilling is needed, false otherwise
-   */
-
-  private boolean isSpillNeeded(int incomingSize) {
-
-    // Can't spill if less than two batches else the merge
-    // can't make progress.
-
-    if (bufferedBatches.size() < 2) {
-      return false; }
-
-    // Must spill if we are below the spill point (the amount of memory
-    // needed to do the minimal spill.)
-
-    return allocator.getAllocatedMemory() + incomingSize >= bufferMemoryPool;
-  }
-
-  /**
-   * Perform an in-memory sort of the buffered batches. Obviously can
-   * be used only for the non-spilling case.
-   *
-   * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
-   */
-
-  private IterOutcome sortInMemory() {
-    logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
-                 bufferedBatches.size(), inputRecordCount, allocator.getAllocatedMemory());
-
-    // Note the difference between how we handle batches here and in the spill/merge
-    // case. In the spill/merge case, this class decides on the batch size to send
-    // downstream. However, in the in-memory case, we must pass along all batches
-    // in a single SV4. Attempts to do paging will result in errors. In the memory
-    // merge case, the downstream Selection Vector Remover will split the one
-    // big SV4 into multiple smaller batches to send further downstream.
-
-    // If the sort fails or is empty, clean up here. Otherwise, cleanup is done
-    // by closing the resultsIterator after all results are returned downstream.
-
-    MergeSort memoryMerge = new MergeSort(context, allocator, opCodeGen);
-    try {
-      sv4 = memoryMerge.merge(bufferedBatches, this, container);
-      if (sv4 == null) {
-        sortState = SortState.DONE;
-        return IterOutcome.STOP;
-      } else {
-        logger.debug("Completed in-memory sort. Memory = {}",
-                     allocator.getAllocatedMemory());
-        resultsIterator = memoryMerge;
-        memoryMerge = null;
-        sortState = SortState.DELIVER;
-        return IterOutcome.OK_NEW_SCHEMA;
-      }
-    } finally {
-      if (memoryMerge != null) {
-        memoryMerge.close();
-      }
-    }
-  }
-
-  /**
-   * Perform merging of (typically spilled) batches. First consolidates batches
-   * as needed, then performs a final merge that is read one batch at a time
-   * to deliver batches to the downstream operator.
-   *
-   * @return always returns OK_NEW_SCHEMA
-   */
-
-  private IterOutcome mergeSpilledRuns() {
-    logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
-                 inputBatchCount, inputRecordCount, allocator.getAllocatedMemory(),
-                 bufferedBatches.size(), spilledRuns.size());
-
-    // Consolidate batches to a number that can be merged in
-    // a single last pass.
-
-    int mergeCount = 0;
-    while (consolidateBatches()) {
-      mergeCount++;
-    }
-    stats.addLongStat(Metric.MERGE_COUNT, mergeCount);
-
-    // Merge in-memory batches and spilled runs for the final merge.
-
-    List<BatchGroup> allBatches = new LinkedList<>();
-    allBatches.addAll(bufferedBatches);
-    bufferedBatches.clear();
-    allBatches.addAll(spilledRuns);
-    spilledRuns.clear();
-
-    logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
-                 allBatches.size(), allocator.getAllocatedMemory());
-
-    // Do the final merge as a results iterator.
-
-    CopierHolder.BatchMerger merger = copierHolder.startFinalMerge(schema, allBatches, container, mergeBatchRowCount);
-    merger.next();
-    resultsIterator = merger;
-    sortState = SortState.DELIVER;
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  private boolean consolidateBatches() {
-
-    // Determine additional memory needed to hold one batch from each
-    // spilled run.
-
-    int inMemCount = bufferedBatches.size();
-    int spilledRunsCount = spilledRuns.size();
-
-    // Can't merge more than will fit into memory at one time.
-
-    int maxMergeWidth = (int) (mergeMemoryPool / targetSpillBatchSize);
-    maxMergeWidth = Math.min(mergeLimit, maxMergeWidth);
-
-    // But, must merge at least two batches.
-
-    maxMergeWidth = Math.max(maxMergeWidth, 2);
-
-    // If we can't fit all batches in memory, must spill any in-memory
-    // batches to make room for multiple spill-merge-spill cycles.
-
-    if (inMemCount > 0) {
-      if (spilledRunsCount > maxMergeWidth) {
-        spillFromMemory();
-        return true;
-      }
-
-      // If we just plain have too many batches to merge, spill some
-      // in-memory batches to reduce the burden.
-
-      if (inMemCount + spilledRunsCount > mergeLimit) {
-        spillFromMemory();
-        return true;
-      }
-
-      // If the on-disk batches and in-memory batches need more memory than
-      // is available, spill some in-memory batches.
-
-      long allocated = allocator.getAllocatedMemory();
-      long totalNeeds = spilledRunsCount * targetSpillBatchSize + allocated;
-      if (totalNeeds > mergeMemoryPool) {
-        spillFromMemory();
-        return true;
-      }
-    }
-
-    // Merge on-disk batches if we have too many.
-
-    int mergeCount = spilledRunsCount - maxMergeWidth;
-    if (mergeCount <= 0) {
-      return false;
-    }
-
-    // Must merge at least 2 batches to make progress.
-
-    mergeCount = Math.max(2, mergeCount);
-
-    // We will merge. This will create yet another spilled
-    // run. Account for that.
-
-    mergeCount += 1;
-
-    mergeCount = Math.min(mergeCount, maxMergeWidth);
-
-    // If we are going to merge, and we have batches in memory,
-    // spill them and try again. We need to do this to ensure we
-    // have adequate memory to hold the merge batches. We are into
-    // a second-generation sort/merge so there is no point in holding
-    // onto batches in memory.
-
-    if (inMemCount > 0) {
-      spillFromMemory();
-      return true;
-    }
-
-    // Do the merge, then loop to try again in case not
-    // all the target batches spilled in one go.
-
-    logger.trace("Merging {} on-disk runs, Alloc. memory = {}",
-        mergeCount, allocator.getAllocatedMemory());
-    mergeRuns(mergeCount);
-    return true;
-  }
-
-  /**
-   * This operator has accumulated a set of sorted incoming record batches.
-   * We wish to spill some of them to disk. To do this, a "copier"
-   * merges the target batches to produce a stream of new (merged) batches
-   * which are then written to disk.
-   * <p>
-   * This method spills only half the accumulated batches
-   * minimizing unnecessary disk writes. The exact count must lie between
-   * the minimum and maximum spill counts.
-   */
-
-  private void spillFromMemory() {
-
-    // Determine the number of batches to spill to create a spill file
-    // of the desired size. The actual file size might be a bit larger
-    // or smaller than the target, which is expected.
-
-    int spillCount = 0;
-    long spillSize = 0;
-    for (InputBatch batch : bufferedBatches) {
-      long batchSize = batch.getDataSize();
-      spillSize += batchSize;
-      spillCount++;
-      if (spillSize + batchSize / 2 > spillFileSize) {
-        break; }
-    }
-
-    // Must always spill at least 2, even if this creates an over-size
-    // spill file. But, if this is a final consolidation, we may have only
-    // a single batch.
-
-    spillCount = Math.max(spillCount, 2);
-    spillCount = Math.min(spillCount, bufferedBatches.size());
-
-    // Do the actual spill.
-
-    mergeAndSpill(bufferedBatches, spillCount);
-  }
-
-  private void mergeRuns(int targetCount) {
-
-    // Determine the number of runs to merge. The count should be the
-    // target count. However, to prevent possible memory overrun, we
-    // double-check with actual spill batch size and only spill as much
-    // as fits in the merge memory pool.
-
-    int mergeCount = 0;
-    long mergeSize = 0;
-    for (SpilledRun run : spilledRuns) {
-      long batchSize = run.getBatchSize();
-      if (mergeSize + batchSize > mergeMemoryPool) {
-        break;
-      }
-      mergeSize += batchSize;
-      mergeCount++;
-      if (mergeCount == targetCount) {
-        break;
-      }
-    }
-
-    // Must always spill at least 2, even if this creates an over-size
-    // spill file. But, if this is a final consolidation, we may have only
-    // a single batch.
-
-    mergeCount = Math.max(mergeCount, 2);
-    mergeCount = Math.min(mergeCount, spilledRuns.size());
-
-    // Do the actual spill.
-
-    mergeAndSpill(spilledRuns, mergeCount);
-  }
-
-  private void mergeAndSpill(LinkedList<? extends BatchGroup> source, int count) {
-    spilledRuns.add(doMergeAndSpill(source, count));
-    logger.trace("Completed spill: memory = {}",
-                 allocator.getAllocatedMemory());
-  }
-
-  private BatchGroup.SpilledRun doMergeAndSpill(LinkedList<? extends BatchGroup> batchGroups, int spillCount) {
-    List<BatchGroup> batchesToSpill = Lists.newArrayList();
-    spillCount = Math.min(batchGroups.size(), spillCount);
-    assert spillCount > 0 : "Spill count to mergeAndSpill must not be zero";
-    for (int i = 0; i < spillCount; i++) {
-      batchesToSpill.add(batchGroups.pollFirst());
-    }
-
-    // Merge the selected set of matches and write them to the
-    // spill file. After each write, we release the memory associated
-    // with the just-written batch.
-
-    String outputFile = spillSet.getNextSpillFile(null);
-    stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
-    BatchGroup.SpilledRun newGroup = null;
-    try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
-         CopierHolder.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, spillBatchRowCount)) {
-      logger.trace("Spilling {} of {} batches, spill batch size = {} rows, memory = {}, write to {}",
-                   batchesToSpill.size(), bufferedBatches.size() + batchesToSpill.size(),
-                   spillBatchRowCount,
-                   allocator.getAllocatedMemory(), outputFile);
-      newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, oContext);
-
-      // The copier will merge records from the buffered batches into
-      // the outputContainer up to targetRecordCount number of rows.
-      // The actual count may be less if fewer records are available.
-
-      while (merger.next()) {
-
-        // Add a new batch of records (given by merger.getOutput()) to the spill
-        // file.
-        //
-        // note that addBatch also clears the merger's output container
-
-        newGroup.addBatch(merger.getOutput());
-      }
-      injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
-      newGroup.closeOutputStream();
-      logger.trace("Spilled {} batches, {} records; memory = {} to {}",
-                   merger.getBatchCount(), merger.getRecordCount(),
-                   allocator.getAllocatedMemory(), outputFile);
-      newGroup.setBatchSize(merger.getEstBatchSize());
-      return newGroup;
-    } catch (Throwable e) {
-      // we only need to clean up newGroup if spill failed
-      try {
-        if (newGroup != null) {
-          AutoCloseables.close(e, newGroup);
-        }
-      } catch (Throwable t) { /* close() may hit the same IO issue; just ignore */ }
-
-      // Here the merger is holding onto a partially-completed batch.
-      // It will release the memory in the close() call.
-
-      try {
-        // Rethrow so we can decide how to handle the error.
-
-        throw e;
-      }
-
-      // If error is a User Exception, just use as is.
-
-      catch (UserException ue) { throw ue; }
-      catch (Throwable ex) {
-        throw UserException.resourceError(ex)
-              .message("External Sort encountered an error while spilling to disk")
-              .build(logger);
-      }
-    }
-  }
-
-  /**
-   * Allocate and initialize the selection vector used as the sort index.
-   * Assumes that memory is available for the vector since memory management
-   * ensured space is available.
-   *
-   * @return a new, populated selection vector 2
-   */
-
-  private SelectionVector2 newSV2() {
-    SelectionVector2 sv2 = new SelectionVector2(allocator);
-    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
-      throw UserException.resourceError(new OutOfMemoryException("Unable to allocate sv2 buffer"))
-            .build(logger);
-    }
-    for (int i = 0; i < incoming.getRecordCount(); i++) {
-      sv2.setIndex(i, (char) i);
-    }
-    sv2.setRecordCount(incoming.getRecordCount());
-    return sv2;
+    sortImpl.setSchema(schema);
   }
 
   @Override
@@ -1494,37 +469,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   @Override
   public void close() {
-    if (spillSet.getWriteBytes() > 0) {
-      logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}",
-                   spillSet.getWriteBytes(), spillSet.getWriteBytes());
-    }
-    stats.setLongStat(Metric.SPILL_MB,
-        (int) Math.round( spillSet.getWriteBytes() / 1024.0D / 1024.0 ) );
     RuntimeException ex = null;
     try {
-      if (bufferedBatches != null) {
-        closeBatchGroups(bufferedBatches);
-        bufferedBatches = null;
-      }
-    } catch (RuntimeException e) {
-      ex = e;
-    }
-    try {
-      if (spilledRuns != null) {
-        closeBatchGroups(spilledRuns);
-        spilledRuns = null;
-      }
-    } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
-    }
-    try {
-      if (sv4 != null) {
-        sv4.clear();
-      }
-    } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
-    }
-    try {
       if (resultsIterator != null) {
         resultsIterator.close();
         resultsIterator = null;
@@ -1533,17 +479,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       ex = (ex == null) ? e : ex;
     }
     try {
-      copierHolder.close();
-    } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
-    }
-    try {
-      spillSet.close();
-    } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
-    }
-    try {
-      opCodeGen.close();
+      if (sortImpl != null) {
+        sortImpl.close();
+        sortImpl = null;
+      }
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
     }
@@ -1558,11 +497,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       ex = (ex == null) ? e : ex;
     }
     // Note: allocator is closed by the FragmentManager
-//    try {
-//      allocator.close();
-//    } catch (RuntimeException e) {
-//      ex = (ex == null) ? e : ex;
-//    }
     if (ex != null) {
       throw ex;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 31475d2..da41e5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentExecContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
    */
 
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
-  private FragmentContext context;
+  private FragmentExecContext context;
 
   /**
    * Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   private int desiredRecordBatchCount;
 
   @Override
-  public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
+  public void setup(final FragmentExecContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
                     final VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
@@ -147,7 +147,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   }
 
   /**
-   * Sort (really, merge) a set of pre-sorted runs to produce a combined
+   * Merge a set of pre-sorted runs to produce a combined
    * result set. Merging is done in the selection vector, record data does
    * not move.
    * <p>
@@ -157,7 +157,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
    */
 
   @Override
-  public void sort(final VectorContainer container) {
+  public void sort() {
     while (runStarts.size() > 1) {
       final int totalCount = this.vector4.getTotalCount();
 
@@ -223,15 +223,21 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
 
   @Override
   public void clear() {
-    if(vector4 != null) {
+    if (vector4 != null) {
       vector4.clear();
+      vector4 = null;
     }
-
-    if(aux != null) {
+    if (aux != null) {
       aux.clear();
+      aux = null;
     }
   }
 
-  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
-  public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex) throws SchemaChangeException;
+  public abstract void doSetup(@Named("context") FragmentExecContext context,
+                               @Named("incoming") VectorContainer incoming,
+                               @Named("outgoing") RecordBatch outgoing)
+                       throws SchemaChangeException;
+  public abstract int doEval(@Named("leftIndex") int leftIndex,
+                             @Named("rightIndex") int rightIndex)
+                      throws SchemaChangeException;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 0d04b7e..06bbdea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentExecContext;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -30,8 +30,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  */
 
 public interface MSorter {
-  public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException;
-  public void sort(VectorContainer container);
+  public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException;
+  public void sort();
   public SelectionVector4 getSV4();
 
   public static TemplateClassDefinition<MSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<MSorter>(MSorter.class, MSortTemplate.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
deleted file mode 100644
index c3e2dbe..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSort.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort.managed;
-
-import java.util.LinkedList;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-/**
- * Wrapper around the "MSorter" (in memory merge sorter). As batches have
- * arrived to the sort, they have been individually sorted and buffered
- * in memory. At the completion of the sort, we detect that no batches
- * were spilled to disk. In this case, we can merge the in-memory batches
- * using an efficient memory-based approach implemented here.
- * <p>
- * Since all batches are in memory, we don't want to use the usual merge
- * algorithm as that makes a copy of the original batches (which were read
- * from a spill file) to produce an output batch. Instead, we want to use
- * the in-memory batches as-is. To do this, we use a selection vector 4
- * (SV4) as a global index into the collection of batches. The SV4 uses
- * the upper two bytes as the batch index, and the lower two as an offset
- * of a record within the batch.
- * <p>
- * The merger ("M Sorter") populates the SV4 by scanning the set of
- * in-memory batches, searching for the one with the lowest value of the
- * sort key. The batch number and offset are placed into the SV4. The process
- * continues until all records from all batches have an entry in the SV4.
- * <p>
- * The actual implementation uses an iterative merge to perform the above
- * efficiently.
- * <p>
- * A sort can only do a single merge. So, we do not attempt to share the
- * generated class; we just generate it internally and discard it at
- * completion of the merge.
- */
-
-public class MergeSort implements SortResults {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeSort.class);
-
-  private SortRecordBatchBuilder builder;
-  private MSorter mSorter;
-  private final FragmentContext context;
-  private final BufferAllocator oAllocator;
-  private SelectionVector4 sv4;
-  private final OperatorCodeGenerator opCg;
-  private int batchCount;
-
-  public MergeSort(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCg) {
-    this.context = context;
-    this.oAllocator = allocator;
-    this.opCg = opCg;
-  }
-
-  /**
-   * Merge the set of in-memory batches to produce a single logical output in the given
-   * destination container, indexed by an SV4.
-   *
-   * @param batchGroups the complete set of in-memory batches
-   * @param batch the record batch (operator) for the sort operator
-   * @param destContainer the vector container for the sort operator
-   * @return the sv4 for this operator
-   */
-
-  public SelectionVector4 merge(LinkedList<BatchGroup.InputBatch> batchGroups, VectorAccessible batch,
-                                VectorContainer destContainer) {
-
-    // Add the buffered batches to a collection that MSorter can use.
-    // The builder takes ownership of the batches and will release them if
-    // an error occurs.
-
-    builder = new SortRecordBatchBuilder(oAllocator);
-    for (BatchGroup.InputBatch group : batchGroups) {
-      RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator);
-      rbd.setSv2(group.getSv2());
-      builder.add(rbd);
-    }
-    batchGroups.clear();
-
-    // Generate the msorter.
-
-    try {
-      builder.build(context, destContainer);
-      sv4 = builder.getSv4();
-      mSorter = opCg.createNewMSorter(batch);
-      mSorter.setup(context, oAllocator, sv4, destContainer, sv4.getCount());
-    } catch (SchemaChangeException e) {
-      throw UserException.unsupportedError(e)
-            .message("Unexpected schema change - likely code error.")
-            .build(logger);
-    }
-
-    // For testing memory-leaks, inject exception after mSorter finishes setup
-    ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SETUP);
-    mSorter.sort(destContainer);
-
-    // sort may have prematurely exited due to should continue returning false.
-    if (!context.shouldContinue()) {
-      return null;
-    }
-
-    // For testing memory-leak purpose, inject exception after mSorter finishes sorting
-    ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(), ExternalSortBatch.INTERRUPTION_AFTER_SORT);
-    sv4 = mSorter.getSV4();
-
-    destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
-    return sv4;
-  }
-
-  /**
-   * The SV4 provides a built-in iterator that returns a virtual set of record
-   * batches so that the downstream operator need not consume the entire set
-   * of accumulated batches in a single step.
-   */
-
-  @Override
-  public boolean next() {
-    boolean more = sv4.next();
-    if (more) { batchCount++; }
-    return more;
-  }
-
-  @Override
-  public void close() {
-    if (builder != null) {
-      builder.clear();
-      builder.close();
-    }
-    if (mSorter != null) {
-      mSorter.clear();
-    }
-  }
-
-  @Override
-  public int getBatchCount() {
-    return batchCount;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return sv4.getTotalCount();
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
new file mode 100644
index 0000000..3ab9af3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.List;
+
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+
+/**
+ * Wrapper around the "MSorter" (in memory merge sorter). As batches have
+ * arrived to the sort, they have been individually sorted and buffered
+ * in memory. At the completion of the sort, we detect that no batches
+ * were spilled to disk. In this case, we can merge the in-memory batches
+ * using an efficient memory-based approach implemented here.
+ * <p>
+ * Since all batches are in memory, we don't want to use the usual merge
+ * algorithm as that makes a copy of the original batches (which were read
+ * from a spill file) to produce an output batch. Instead, we want to use
+ * the in-memory batches as-is. To do this, we use a selection vector 4
+ * (SV4) as a global index into the collection of batches. The SV4 uses
+ * the upper two bytes as the batch index, and the lower two as an offset
+ * of a record within the batch.
+ * <p>
+ * The merger ("M Sorter") populates the SV4 by scanning the set of
+ * in-memory batches, searching for the one with the lowest value of the
+ * sort key. The batch number and offset are placed into the SV4. The process
+ * continues until all records from all batches have an entry in the SV4.
+ * <p>
+ * The actual implementation uses an iterative merge to perform the above
+ * efficiently.
+ * <p>
+ * A sort can only do a single merge. So, we do not attempt to share the
+ * generated class; we just generate it internally and discard it at
+ * completion of the merge.
+ * <p>
+ * The merge sorter only makes sense when we have at least one row. The
+ * caller must handle the special case of no rows.
+ */
+
+public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeSortWrapper.class);
+
+  public enum State { FIRST, BODY, EOF }
+
+  private SortRecordBatchBuilder builder;
+  private MSorter mSorter;
+  private SelectionVector4 sv4;
+  private int batchCount;
+  private State state = State.FIRST;
+  private final VectorContainer destContainer;
+
+  public MergeSortWrapper(OperExecContext opContext, VectorContainer destContainer) {
+    super(opContext);
+    this.destContainer = destContainer;
+  }
+
+  /**
+   * Merge the set of in-memory batches to produce a single logical output in the given
+   * destination container, indexed by an SV4.
+   *
+   * @param batchGroups the complete set of in-memory batches
+   * @param batch the record batch (operator) for the sort operator
+   * @param destContainer the vector container for the sort operator
+   * @return the sv4 for this operator
+   */
+
+  public void merge(List<BatchGroup.InputBatch> batchGroups) {
+
+    // Add the buffered batches to a collection that MSorter can use.
+    // The builder takes ownership of the batches and will release them if
+    // an error occurs.
+
+    builder = new SortRecordBatchBuilder(context.getAllocator());
+    for (BatchGroup.InputBatch group : batchGroups) {
+      RecordBatchData rbd = new RecordBatchData(group.getContainer(), context.getAllocator());
+      rbd.setSv2(group.getSv2());
+      builder.add(rbd);
+    }
+    batchGroups.clear();
+
+    // Generate the msorter.
+
+    try {
+      builder.build(destContainer);
+      sv4 = builder.getSv4();
+      Sort popConfig = context.getOperatorDefn();
+      mSorter = createNewMSorter(popConfig.getOrderings(), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+      mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount());
+    } catch (SchemaChangeException e) {
+      throw UserException.unsupportedError(e)
+            .message("Unexpected schema change - likely code error.")
+            .build(logger);
+    }
+
+    // For testing memory-leaks, inject exception after mSorter finishes setup
+    context.injectUnchecked(ExternalSortBatch.INTERRUPTION_AFTER_SETUP);
+    mSorter.sort();
+
+    // For testing memory-leak purpose, inject exception after mSorter finishes sorting
+    context.injectUnchecked(ExternalSortBatch.INTERRUPTION_AFTER_SORT);
+    sv4 = mSorter.getSV4();
+
+    destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+  }
+
+  private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
+    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+    cg.plainJavaCapable(true);
+
+    // Uncomment out this line to debug the generated code.
+//  cg.saveCodeForDebugging(true);
+    ClassGenerator<MSorter> g = cg.getRoot();
+    g.setMappingSet(mainMapping);
+
+    for (Ordering od : orderings) {
+      // first, we rewrite the evaluation stack for each side of the comparison.
+      ErrorCollector collector = new ErrorCollectorImpl();
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector, context.getFunctionRegistry());
+      if (collector.hasErrors()) {
+        throw UserException.unsupportedError()
+              .message("Failure while materializing expression. " + collector.toErrorString())
+              .build(logger);
+      }
+      g.setMappingSet(leftMapping);
+      HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(rightMapping);
+      HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(mainMapping);
+
+      // next we wrap the two comparison sides and add the expression block for the comparison.
+      LogicalExpression fh =
+          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
+                                                         context.getFunctionRegistry());
+      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+
+      if (od.getDirection() == Direction.ASCENDING) {
+        jc._then()._return(out.getValue());
+      }else{
+        jc._then()._return(out.getValue().minus());
+      }
+      g.rotateBlock();
+    }
+
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.lit(0));
+
+    return getInstance(cg, logger);
+  }
+
+  /**
+   * The SV4 provides a built-in iterator that returns a virtual set of record
+   * batches so that the downstream operator need not consume the entire set
+   * of accumulated batches in a single step.
+   */
+
+  @Override
+  public boolean next() {
+    switch (state) {
+    case BODY:
+      if (! sv4.next()) {
+        state = State.EOF;
+        return false;
+      }
+      return true;
+    case EOF:
+      return false;
+    case FIRST:
+      state = State.BODY;
+      return true;
+    default:
+      throw new IllegalStateException( "Unexpected case: " + state );
+    }
+  }
+
+  @Override
+  public void close() {
+    RuntimeException ex = null;
+    try {
+      if (builder != null) {
+        builder.clear();
+        builder.close();
+        builder = null;
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      if (mSorter != null) {
+        mSorter.clear();
+        mSorter = null;
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    try {
+      if (sv4 != null) {
+        sv4.clear();
+      }
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  @Override
+  public int getBatchCount() { return batchCount; }
+
+  @Override
+  public int getRecordCount() { return sv4.getCount(); }
+
+  @Override
+  public SelectionVector4 getSv4() { return sv4; }
+
+  @Override
+  public SelectionVector2 getSv2() { return null; }
+
+  @Override
+  public VectorContainer getContainer() { return destContainer; }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
deleted file mode 100644
index 57846db..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/OperatorCodeGenerator.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort.managed;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.vector.CopyUtil;
-
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
-
-/**
- * Generates and manages the data-specific classes for this operator.
- * <p>
- * Several of the code generation methods take a batch, but the methods
- * are called for many batches, and generate code only for the first one.
- * Better would be to generate code from a schema; but Drill is not set
- * up for that at present.
- */
-
-public class OperatorCodeGenerator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCodeGenerator.class);
-
-  protected static final MappingSet MAIN_MAPPING = new MappingSet((String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-  protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-
-  private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
-  private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
-
-  private final FragmentContext context;
-  @SuppressWarnings("unused")
-  private BatchSchema schema;
-
-  /**
-   * A single PriorityQueueCopier instance is used for 2 purposes:
-   * 1. Merge sorted batches before spilling
-   * 2. Merge sorted batches when all incoming data fits in memory
-   */
-
-  private PriorityQueueCopier copier;
-  private final Sort popConfig;
-
-  /**
-   * Generated sort operation used to sort each incoming batch according to
-   * the sort criteria specified in the {@link ExternalSort} definition of
-   * this operator.
-   */
-
-  private SingleBatchSorter sorter;
-
-  public OperatorCodeGenerator(FragmentContext context, Sort popConfig) {
-    this.context = context;
-    this.popConfig = popConfig;
-  }
-
-  public void setSchema(BatchSchema schema) {
-    close();
-    this.schema = schema;
-  }
-
-  public void close() {
-    closeCopier();
-    sorter = null;
-  }
-
-  public void closeCopier() {
-    if (copier == null) {
-      return; }
-    try {
-      copier.close();
-      copier = null;
-    } catch (IOException e) {
-      throw UserException.dataWriteError(e)
-            .message("Failure while flushing spilled data")
-            .build(logger);
-    }
-  }
-
-  public PriorityQueueCopier getCopier(VectorAccessible batch) {
-    if (copier == null) {
-      copier = generateCopier(batch);
-    }
-    return copier;
-  }
-
-  private PriorityQueueCopier generateCopier(VectorAccessible batch) {
-    // Generate the copier code and obtain the resulting class
-
-    CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
-    ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
-    cg.plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-//  cg.saveCodeForDebugging(true);
-
-    generateComparisons(g, batch);
-
-    g.setMappingSet(COPIER_MAPPING_SET);
-    CopyUtil.generateCopies(g, batch, true);
-    g.setMappingSet(MAIN_MAPPING);
-    return getInstance(cg);
-  }
-
-  public MSorter createNewMSorter(VectorAccessible batch) {
-    return createNewMSorter(popConfig.getOrderings(), batch, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
-  }
-
-  private MSorter createNewMSorter(List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
-    CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
-    cg.plainJavaCapable(true);
-
-    // Uncomment out this line to debug the generated code.
-//  cg.saveCodeForDebugging(true);
-    ClassGenerator<MSorter> g = cg.getRoot();
-    g.setMappingSet(mainMapping);
-
-    for (Ordering od : orderings) {
-      // first, we rewrite the evaluation stack for each side of the comparison.
-      ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw UserException.unsupportedError()
-              .message("Failure while materializing expression. " + collector.toErrorString())
-              .build(logger);
-      }
-      g.setMappingSet(leftMapping);
-      HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(rightMapping);
-      HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(mainMapping);
-
-      // next we wrap the two comparison sides and add the expression block for the comparison.
-      LogicalExpression fh =
-          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
-                                                         context.getFunctionRegistry());
-      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
-      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
-      if (od.getDirection() == Direction.ASCENDING) {
-        jc._then()._return(out.getValue());
-      }else{
-        jc._then()._return(out.getValue().minus());
-      }
-      g.rotateBlock();
-    }
-
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.lit(0));
-
-    return getInstance(cg);
-  }
-
-  public SingleBatchSorter getSorter(VectorAccessible batch) {
-    if (sorter == null) {
-      sorter = createNewSorter(batch);
-    }
-    return sorter;
-  }
-
-  private SingleBatchSorter createNewSorter(VectorAccessible batch) {
-    CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
-        SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(),
-        context.getOptions());
-    ClassGenerator<SingleBatchSorter> g = cg.getRoot();
-    cg.plainJavaCapable(true);
-    // Uncomment out this line to debug the generated code.
-//  cg.saveCodeForDebugging(true);
-
-    generateComparisons(g, batch);
-    return getInstance(cg);
-  }
-
-  private <T> T getInstance(CodeGenerator<T> cg) {
-    try {
-      return context.getImplementationClass(cg);
-    } catch (ClassTransformationException e) {
-      throw UserException.unsupportedError(e)
-            .message("Code generation error - likely code error.")
-            .build(logger);
-    } catch (IOException e) {
-      throw UserException.resourceError(e)
-            .message("IO Error during code generation.")
-            .build(logger);
-    }
-  }
-
-  protected void generateComparisons(ClassGenerator<?> g, VectorAccessible batch)  {
-    g.setMappingSet(MAIN_MAPPING);
-
-    for (Ordering od : popConfig.getOrderings()) {
-      // first, we rewrite the evaluation stack for each side of the comparison.
-      ErrorCollector collector = new ErrorCollectorImpl();
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw UserException.unsupportedError()
-              .message("Failure while materializing expression. " + collector.toErrorString())
-              .build(logger);
-      }
-      g.setMappingSet(LEFT_MAPPING);
-      HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(RIGHT_MAPPING);
-      HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
-      g.setMappingSet(MAIN_MAPPING);
-
-      // next we wrap the two comparison sides and add the expression block for the comparison.
-      LogicalExpression fh =
-          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
-                                                         context.getFunctionRegistry());
-      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
-      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
-      if (od.getDirection() == Direction.ASCENDING) {
-        jc._then()._return(out.getValue());
-      }else{
-        jc._then()._return(out.getValue().minus());
-      }
-      g.rotateBlock();
-    }
-
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.lit(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
index 2657bb8..be88232 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopier.java
@@ -23,11 +23,10 @@ import java.util.List;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 
 public interface PriorityQueueCopier extends AutoCloseable {
-  public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch,
+  public void setup(BufferAllocator allocator, VectorAccessible hyperBatch,
       List<BatchGroup> batchGroups, VectorAccessible outgoing) throws SchemaChangeException;
 
   public int next(int targetRecordCount);