You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:13 UTC

[5/5] drill git commit: DRILL-5325: Unit tests for the managed sort

DRILL-5325: Unit tests for the managed sort

Uses the sub-operator test framework (DRILL-5318), including the test
row set abstraction (DRILL-5323) to enable unit testing of the
“managed” external sort. This PR allows early review of the code, but
cannot be pulled until the dependencies (mentioned above) are pulled.

Refactors the external sort code into small chunks that can be unit
tested, then “wraps” that code in tests for all interesting data types,
record batch sizes, and so on.

Refactors some of the operator definitions to more easily allow
programmatic setup in the unit tests.

Fixes a number of bugs discovered by the unit tests. The biggest
changes were in the new code: the code that computes spilling and
merging based on memory levels.

Otherwise, although GitHub will show many files change, most of the
changes are simply moving blocks of code around to create smaller units
that can be tested independently.

Includes a refactoring of the code that does spilling, along with a
complete set of low-level unit tests.

Excludes long-running sort tests.

Defines a test category for long-running tests.

First attempt to provide a way to run such tests from Maven.

closes #808


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/90f43bff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/90f43bff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/90f43bff

Branch: refs/heads/master
Commit: 90f43bff7a01eaaee6c8861137759b05367dfcf3
Parents: c16e5f8
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Apr 6 13:57:19 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Wed Jun 21 10:02:16 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/test/SecondaryTest.java    |   70 +
 exec/java-exec/pom.xml                          |    5 +
 .../org/apache/drill/exec/ExecConstants.java    |    2 -
 .../drill/exec/cache/VectorSerializer.java      |  121 ++
 .../drill/exec/physical/base/AbstractBase.java  |    2 +-
 .../drill/exec/physical/base/FragmentRoot.java  |    2 +-
 .../apache/drill/exec/physical/base/Root.java   |    2 +-
 .../exec/physical/config/ExternalSort.java      |    4 +-
 .../impl/aggregate/HashAggTemplate.java         |    2 +-
 .../impl/sort/SortRecordBatchBuilder.java       |   12 +-
 .../exec/physical/impl/spill/SpillSet.java      |   35 +-
 .../impl/svremover/CopierTemplate2.java         |    6 +-
 .../exec/physical/impl/xsort/BatchGroup.java    |    2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |    6 +-
 .../impl/xsort/managed/BaseSortWrapper.java     |   90 ++
 .../impl/xsort/managed/BaseWrapper.java         |   53 +
 .../physical/impl/xsort/managed/BatchGroup.java |   76 +-
 .../impl/xsort/managed/BufferedBatches.java     |  232 ++++
 .../impl/xsort/managed/CopierHolder.java        |  322 -----
 .../impl/xsort/managed/ExternalSortBatch.java   | 1226 ++----------------
 .../impl/xsort/managed/MSortTemplate.java       |   26 +-
 .../physical/impl/xsort/managed/MSorter.java    |    6 +-
 .../physical/impl/xsort/managed/MergeSort.java  |  167 ---
 .../impl/xsort/managed/MergeSortWrapper.java    |  261 ++++
 .../xsort/managed/OperatorCodeGenerator.java    |  259 ----
 .../impl/xsort/managed/PriorityQueueCopier.java |    3 +-
 .../managed/PriorityQueueCopierTemplate.java    |   74 +-
 .../managed/PriorityQueueCopierWrapper.java     |  341 +++++
 .../physical/impl/xsort/managed/SortConfig.java |  121 ++
 .../physical/impl/xsort/managed/SortImpl.java   |  491 +++++++
 .../impl/xsort/managed/SortMemoryManager.java   |  513 ++++++++
 .../impl/xsort/managed/SortMetrics.java         |   97 ++
 .../impl/xsort/managed/SorterWrapper.java       |   92 ++
 .../impl/xsort/managed/SpilledRuns.java         |  235 ++++
 .../src/main/resources/drill-module.conf        |   18 +-
 .../exec/cache/TestBatchSerialization.java      |  216 +++
 .../drill/exec/cache/TestWriteToDisk.java       |    2 +-
 .../physical/impl/xsort/TestExternalSort.java   |    3 +
 .../impl/xsort/TestSimpleExternalSort.java      |   69 +-
 .../impl/xsort/TestSortSpillWithException.java  |   72 +-
 .../impl/xsort/managed/SortTestUtilities.java   |  132 ++
 .../physical/impl/xsort/managed/TestCopier.java |  377 ++++++
 .../xsort/managed/TestExternalSortExec.java     |  188 +++
 .../managed/TestExternalSortInternals.java      |  632 +++++++++
 .../impl/xsort/managed/TestSortImpl.java        |  609 +++++++++
 .../physical/impl/xsort/managed/TestSorter.java |  605 +++++++++
 .../apache/drill/exec/memory/BaseAllocator.java |   25 +-
 .../drill/common/expression/FieldReference.java |    9 +-
 .../drill/common/expression/SchemaPath.java     |    5 +-
 .../apache/drill/common/logical/data/Order.java |   73 +-
 pom.xml                                         |    3 +-
 51 files changed, 5871 insertions(+), 2123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/common/src/main/java/org/apache/drill/test/SecondaryTest.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/test/SecondaryTest.java b/common/src/main/java/org/apache/drill/test/SecondaryTest.java
new file mode 100644
index 0000000..6b9a187
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/test/SecondaryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+/**
+ * Label for Drill secondary tests. A secondary test is one that is omitted from
+ * the normal Drill build because:
+ * <ul>
+ * <li>It is slow</li>
+ * <li>It tests particular functionality which need not be tested on every
+ * build.</li>
+ * <li>It is old, but still worth running once in a while.</li>
+ * <li>It requires specialized setup and/or runs on specific platforms.</li>
+ * </ul>
+ *
+ * To mark a test as secondary, do either:<pre><code>
+ * {@literal @}Category(SecondaryTest.class)
+ * class MyTest {
+ *    ...
+ * }
+ * </pre></code>Or:<pre><code>
+ * class MyTest {
+ *   {@literal @}Category(SecondaryTest.class)
+ *   public void slowTest() { ... }
+ * }
+ * </code></pre>
+ * Maven is configured as follows:<pre><code>
+ *    &lt;plugin>
+ *      &lt;artifactId>maven-surefire-plugin&lt;/artifactId>
+ *      ...
+ *      &lt;configuration>
+ *        ...
+ *        &lt;excludedGroups>org.apache.drill.test.SecondaryTest&lt;/excludedGroups>
+ *      &lt;/configuration>
+ *      ...
+ *    &lt;/plugin></code></pre>
+ *  To run all tests (including the secondary tests) (preliminary):<pre><code>
+ *  > mvn surefire:test -Dgroups=org.apache.drill.test.SecondaryTest -DexcludedGroups=</code></pre>
+ *  The above says to run the secondary test and exclude nothing. The exclusion
+ *  is required to override the default exclusions: skip that parameter and Maven will
+ *  blindly try to run all tests annotated with the SecondaryTest category except
+ *  for those annotated with the SecondaryTest category, which is not very helpful...
+ *  <p>
+ *  Note that <tt>java-exec</tt> (only) provides a named execution to run large tests:
+ *  <p>
+ *  <tt>mvn surefire:test@include-large-tests</tt>
+ *  <p>
+ *  However, the above does not work. Nor did it work to include the category in
+ *  a profile as described earlier. At present, there is no known way to run just
+ *  the secondary tests from Maven. Sorry...
+ */
+
+public interface SecondaryTest {
+  // Junit category marker
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 3b1ac4c..cd287aa 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -532,6 +532,7 @@
                   <includes>
                     <include>**/TestLargeFileCompilation.java</include>
                   </includes>
+                  <groups>org.apache.drill.test.SecondaryTest</groups>
                 </configuration>
               </execution>
             </executions>
@@ -732,9 +733,13 @@
             <phase>test</phase>
             <goals><goal>test</goal></goals>
             <configuration>
+              <!--  Legacy: exclude by file name -->
               <excludes>
                 <exclude>**/TestLargeFileCompilation.java</exclude>
               </excludes>
+              <!-- Modern: include using the @Category annotation.
+                   See the Javadoc for SecondaryTest for details. -->
+              <excludedGroups>org.apache.drill.test.SecondaryTest</excludedGroups>
             </configuration>
           </execution>
         </executions>

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 537377d..e2782e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -84,8 +84,6 @@ public interface ExecConstants {
   String EXTERNAL_SORT_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.spill_batch_size";
   String EXTERNAL_SORT_MERGE_BATCH_SIZE = "drill.exec.sort.external.spill.merge_batch_size";
   String EXTERNAL_SORT_MAX_MEMORY = "drill.exec.sort.external.mem_limit";
-
-  // Used only by the "unmanaged" sort.
   String EXTERNAL_SORT_BATCH_LIMIT = "drill.exec.sort.external.batch_limit";
 
   // External Sort Runtime options

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
new file mode 100644
index 0000000..eeef9e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+/**
+ * Serializes vector containers to an output stream or from
+ * an input stream.
+ */
+
+public class VectorSerializer {
+
+  /**
+   * Writes multiple VectorAccessible or VectorContainer
+   * objects to an output stream.
+   */
+
+  public static class Writer {
+
+    private final OutputStream stream;
+    private final BufferAllocator allocator;
+    private boolean retain;
+    private long timeNs;
+
+    public Writer(BufferAllocator allocator, OutputStream stream) {
+      this.allocator = allocator;
+      this.stream = stream;
+    }
+
+    public Writer retain() {
+      retain = true;
+      return this;
+    }
+
+    public Writer write(VectorAccessible va) throws IOException {
+      return write(va, null);
+    }
+
+    @SuppressWarnings("resource")
+    public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(
+          va.getRecordCount(), va, sv2 != null);
+      return write(batch, sv2);
+    }
+
+    public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
+      VectorAccessibleSerializable vas;
+      if (sv2 == null) {
+        vas = new VectorAccessibleSerializable(batch, allocator);
+      } else {
+        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
+      }
+      if (retain) {
+        vas.writeToStreamAndRetain(stream);
+      } else {
+        vas.writeToStream(stream);
+      }
+      timeNs += vas.getTimeNs();
+      return this;
+    }
+
+    public long timeNs() { return timeNs; }
+  }
+
+  /**
+   * Read one or more vector containers from an input stream.
+   */
+
+  public static class Reader {
+    private final InputStream stream;
+    private long timeNs;
+    private final VectorAccessibleSerializable vas;
+
+    public Reader(BufferAllocator allocator, InputStream stream) {
+      this.stream = stream;
+      vas = new VectorAccessibleSerializable(allocator);
+    }
+
+    public VectorContainer read() throws IOException {
+      vas.readFromStream(stream);
+      timeNs = vas.getTimeNs();
+      return vas.get();
+    }
+
+    public SelectionVector2 sv2() { return vas.getSv2(); }
+
+    public long timeNs() { return timeNs; }
+  }
+
+  public static Writer writer(BufferAllocator allocator, OutputStream stream) {
+    return new Writer(allocator, stream);
+  }
+
+  public static Reader reader(BufferAllocator allocator, InputStream stream) {
+    return new Reader(allocator, stream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 6f42250..b4c8536 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -24,7 +24,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.google.common.base.Preconditions;
 
-public abstract class AbstractBase implements PhysicalOperator{
+public abstract class AbstractBase implements PhysicalOperator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
 
   public static long INIT_ALLOCATION = 1_000_000L;

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
index 1721fcf..32910c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
@@ -21,5 +21,5 @@ package org.apache.drill.exec.physical.base;
 /**
  * Describes the root operation within a particular Fragment. This includes things Sender nodes.
  */
-public interface FragmentRoot extends FragmentLeaf{
+public interface FragmentRoot extends FragmentLeaf {
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
index bd280ae..e562240 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Root.java
@@ -20,5 +20,5 @@ package org.apache.drill.exec.physical.base;
 /**
  * Marker interface describe the root of a query plan.  Currently, this is constrained to Screen.
  */
-public interface Root extends FragmentRoot{
+public interface Root extends FragmentRoot {
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index cb9679d..f0e88b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -31,10 +31,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class ExternalSort extends Sort {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSort.class);
 
+  public static final long DEFAULT_SORT_ALLOCATION = 20_000_000;
+
   @JsonCreator
   public ExternalSort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
     super(child, orderings, reverse);
-    initialAllocation = 20_000_000;
+    initialAllocation = DEFAULT_SORT_ALLOCATION;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 38f0222..8393937 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -367,7 +367,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       }
     }
 
-    spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+    spillSet = new SpillSet(context, hashAggrConfig);
     baseHashTable =
         new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
     this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index d46990f..cd52a91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -98,6 +98,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     return true;
   }
 
+  @SuppressWarnings("resource")
   public void add(RecordBatchData rbd) {
     long batchBytes = getSize(rbd.getContainer());
     if (batchBytes == 0 && batches.size() > 0) {
@@ -140,7 +141,12 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     return batches.isEmpty();
   }
 
-  public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{
+  public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException {
+    build(outputContainer);
+  }
+
+  @SuppressWarnings("resource")
+  public void build(VectorContainer outputContainer) throws SchemaChangeException {
     outputContainer.clear();
     if (batches.keySet().size() > 1) {
       throw new SchemaChangeException("Sort currently only supports a single schema.");
@@ -177,7 +183,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
       int index = 0;
       int recordBatchId = 0;
       for (RecordBatchData d : data) {
-        for (int i =0; i < d.getRecordCount(); i++, index++) {
+        for (int i = 0; i < d.getRecordCount(); i++, index++) {
           sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
         }
         // might as well drop the selection vector since we'll stop using it now.

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 87eebc6..5a02cee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -30,13 +30,13 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -283,7 +283,7 @@ public class SpillSet {
     private File baseDir;
 
     public LocalFileManager(String fsName) {
-      baseDir = new File(fsName.replace("file://", ""));
+      baseDir = new File(fsName.replace(FileSystem.DEFAULT_FS, ""));
     }
 
     @Override
@@ -357,30 +357,31 @@ public class SpillSet {
 
   private long writeBytes;
 
-  public SpillSet(FragmentContext context, PhysicalOperator popConfig, UserBitShared.CoreOperatorType optype) {
-    FragmentHandle handle = context.getHandle();
-    String operName = "Unknown";
+  public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
+    this(context.getConfig(), context.getHandle(), popConfig);
+  }
+
+  public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig) {
+    String operName;
 
     // Set the spill options from the configuration
-    DrillConfig config = context.getConfig();
     String spillFs;
     List<String> dirList;
 
     // Set the operator name (used as part of the spill file name),
     // and set oper. specific options (the config file defaults to using the
     // common options; users may override those - per operator)
-    switch (optype) {
-      case EXTERNAL_SORT:
+    if (popConfig instanceof Sort) {
         operName = "Sort";
         spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
         dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
-        break;
-      case HASH_AGGREGATE:
+    } else if (popConfig instanceof HashAggregate) {
         operName = "HashAgg";
         spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM);
         dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS);
-        break;
-      default: // just use the common ones
+    } else {
+        // just use the common ones
+        operName = "Unknown";
         spillFs = config.getString(ExecConstants.SPILL_FILESYSTEM);
         dirList = config.getStringList(ExecConstants.SPILL_DIRS);
     }
@@ -406,7 +407,7 @@ public class SpillSet {
     // as a proxy for a non-production Drill setup.)
 
     boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
-    if (spillFs.startsWith("file:///") && ! impersonationEnabled) {
+    if (spillFs.startsWith(FileSystem.DEFAULT_FS) && ! impersonationEnabled) {
       fileManager = new LocalFileManager(spillFs);
     } else {
       fileManager = new HadoopFileManager(spillFs);
@@ -416,6 +417,10 @@ public class SpillSet {
         operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
   }
 
+  public String getNextSpillFile() {
+    return getNextSpillFile(null);
+  }
+
   public String getNextSpillFile(String extraName) {
 
     // Identify the next directory from the round-robin list to
@@ -427,7 +432,7 @@ public class SpillSet {
     currSpillDirs.add(currSpillPath);
 
     String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount);
-    if ( extraName != null ) {
+    if (extraName != null) {
       outputFile += "_" + extraName;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index bdee8ae..96daf7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -65,9 +65,7 @@ public abstract class CopierTemplate2 implements Copier{
                                @Named("incoming") RecordBatch incoming,
                                @Named("outgoing") RecordBatch outgoing)
                        throws SchemaChangeException;
-  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex)
+  public abstract void doEval(@Named("inIndex") int inIndex,
+                              @Named("outIndex") int outIndex)
                        throws SchemaChangeException;
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 13f0dbe..09d6bae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -117,7 +117,7 @@ public class BatchGroup implements VectorAccessible, AutoCloseable {
     spilledBatches--;
     currentContainer.zeroVectors();
     Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
-    for (VectorWrapper w : currentContainer) {
+    for (VectorWrapper<?> w : currentContainer) {
       TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
       pair.transfer();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 25f05b3..39c662f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -346,9 +346,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
               if (unionTypeEnabled) {
                 this.schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema());
               } else {
-                throw SchemaChangeException.schemaChanged("Schema changes not supported in External Sort. Please enable Union type",
-                    schema,
-                    incoming.getSchema());
+                throw new SchemaChangeException("Schema changes not supported in External Sort. Please enable Union type");
               }
             } else {
               schema = incoming.getSchema();
@@ -475,7 +473,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           builder.add(rbd);
         }
 
-        builder.build(context, container);
+        builder.build(container);
         sv4 = builder.getSv4();
         mSorter = createNewMSorter();
         mSorter.setup(context, oAllocator, getSelectionVector4(), this.container);

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
new file mode 100644
index 0000000..1f381b9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.record.VectorAccessible;
+
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+
+/**
+ * Base wrapper for algorithms that use sort comparisons.
+ */
+
+public abstract class BaseSortWrapper extends BaseWrapper {
+
+  protected static final MappingSet MAIN_MAPPING = new MappingSet((String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+
+  public BaseSortWrapper(OperExecContext opContext) {
+    super(opContext);
+  }
+
+  protected void generateComparisons(ClassGenerator<?> g, VectorAccessible batch, org.slf4j.Logger logger)  {
+    g.setMappingSet(MAIN_MAPPING);
+
+    Sort popConfig = context.getOperatorDefn();
+    for (Ordering od : popConfig.getOrderings()) {
+      // first, we rewrite the evaluation stack for each side of the comparison.
+      ErrorCollector collector = new ErrorCollectorImpl();
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+      if (collector.hasErrors()) {
+        throw UserException.unsupportedError()
+              .message("Failure while materializing expression. " + collector.toErrorString())
+              .build(logger);
+      }
+      g.setMappingSet(LEFT_MAPPING);
+      HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(RIGHT_MAPPING);
+      HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+      g.setMappingSet(MAIN_MAPPING);
+
+      // next we wrap the two comparison sides and add the expression block for the comparison.
+      LogicalExpression fh =
+          FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
+                                                         context.getFunctionRegistry());
+      HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
+
+      if (od.getDirection() == Direction.ASCENDING) {
+        jc._then()._return(out.getValue());
+      }else{
+        jc._then()._return(out.getValue().minus());
+      }
+      g.rotateBlock();
+    }
+
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.lit(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
new file mode 100644
index 0000000..e607f40
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.OperExecContext;
+
+/**
+ * Base class for code-generation-based tasks.
+ */
+
+public abstract class BaseWrapper {
+
+  protected OperExecContext context;
+
+  public BaseWrapper(OperExecContext context) {
+    this.context = context;
+  }
+
+  protected <T> T getInstance(CodeGenerator<T> cg, org.slf4j.Logger logger) {
+    try {
+      return context.getImplementationClass(cg);
+    } catch (ClassTransformationException e) {
+      throw UserException.unsupportedError(e)
+            .message("Code generation error - likely code error.")
+            .build(logger);
+    } catch (IOException e) {
+      throw UserException.resourceError(e)
+            .message("IO Error during code generation.")
+            .build(logger);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
index 2e5d5b2..a74183c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
@@ -36,7 +36,6 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -78,15 +77,13 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     private final SelectionVector2 sv2;
     private final int dataSize;
 
-    public InputBatch(VectorContainer container, SelectionVector2 sv2, OperatorContext context, int dataSize) {
-      super(container, context);
+    public InputBatch(VectorContainer container, SelectionVector2 sv2, BufferAllocator allocator, int dataSize) {
+      super(container, allocator);
       this.sv2 = sv2;
       this.dataSize = dataSize;
     }
 
-    public SelectionVector2 getSv2() {
-      return sv2;
-    }
+    public SelectionVector2 getSv2() { return sv2; }
 
     public int getDataSize() { return dataSize; }
 
@@ -110,14 +107,10 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
 
     @Override
     public void close() throws IOException {
-      try {
-        super.close();
-      }
-      finally {
-        if (sv2 != null) {
-          sv2.clear();
-        }
+      if (sv2 != null) {
+        sv2.clear();
       }
+      super.close();
     }
   }
 
@@ -152,24 +145,23 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     private BufferAllocator allocator;
     private int spilledBatches;
     private long batchSize;
+    private VectorSerializer.Writer writer;
+    private VectorSerializer.Reader reader;
 
-    public SpilledRun(SpillSet spillSet, String path, OperatorContext context) throws IOException {
-      super(null, context);
+    public SpilledRun(SpillSet spillSet, String path, BufferAllocator allocator) throws IOException {
+      super(null, allocator);
       this.spillSet = spillSet;
       this.path = path;
-      this.allocator = context.getAllocator();
+      this.allocator = allocator;
       outputStream = spillSet.openForOutput(path);
+      writer = VectorSerializer.writer(allocator, outputStream);
     }
 
     public void addBatch(VectorContainer newContainer) throws IOException {
-      int recordCount = newContainer.getRecordCount();
-      @SuppressWarnings("resource")
-      WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, newContainer, false);
-      VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
       Stopwatch watch = Stopwatch.createStarted();
-      outputBatch.writeToStream(outputStream);
+      writer.write(newContainer);
       newContainer.zeroVectors();
-      logger.trace("Wrote {} records in {} us", recordCount, watch.elapsed(TimeUnit.MICROSECONDS));
+      logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS));
       spilledBatches++;
 
       // Hold onto the husk of the last added container so that we have a
@@ -184,6 +176,7 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     }
 
     public long getBatchSize() { return batchSize; }
+    public String getPath() { return path; }
 
     @Override
     public int getNextIndex() {
@@ -216,19 +209,18 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
     private VectorContainer getBatch() throws IOException {
       if (inputStream == null) {
         inputStream = spillSet.openForInput(path);
+        reader = VectorSerializer.reader(allocator, inputStream);
       }
-      VectorAccessibleSerializable vas = new VectorAccessibleSerializable(allocator);
       Stopwatch watch = Stopwatch.createStarted();
-      vas.readFromStream(inputStream);
-      VectorContainer c =  vas.get();
+      VectorContainer c =  reader.read();
       if (schema != null) {
-        c = SchemaUtil.coerceContainer(c, schema, context);
+        c = SchemaUtil.coerceContainer(c, schema, allocator);
       }
       logger.trace("Read {} records in {} us", c.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS));
       spilledBatches--;
       currentContainer.zeroVectors();
       Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
-      for (@SuppressWarnings("rawtypes") VectorWrapper w : currentContainer) {
+      for (VectorWrapper<?> w : currentContainer) {
         TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
         pair.transfer();
       }
@@ -279,6 +271,7 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
       spillSet.tallyReadBytes(readLength);
       inputStream.close();
       inputStream = null;
+      reader = null;
       logger.trace("Summary: Read {} bytes from {}", readLength, path);
     }
 
@@ -290,6 +283,7 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
       spillSet.tallyWriteBytes(writeSize);
       outputStream.close();
       outputStream = null;
+      writer = null;
       logger.trace("Summary: Wrote {} bytes to {}", writeSize, path);
       return writeSize;
     }
@@ -297,12 +291,12 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
 
   protected VectorContainer currentContainer;
   protected int pointer = 0;
-  protected final OperatorContext context;
+  protected final BufferAllocator allocator;
   protected BatchSchema schema;
 
-  public BatchGroup(VectorContainer container, OperatorContext context) {
+  public BatchGroup(VectorContainer container, BufferAllocator allocator) {
     this.currentContainer = container;
-    this.context = context;
+    this.allocator = allocator;
   }
 
   /**
@@ -311,7 +305,7 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
    * @param schema
    */
   public void setSchema(BatchSchema schema) {
-    currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, context);
+    currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, allocator);
     this.schema = schema;
   }
 
@@ -371,4 +365,20 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   public SelectionVector4 getSelectionVector4() {
     throw new UnsupportedOperationException();
   }
+
+  public static void closeAll(Collection<? extends BatchGroup> groups) {
+    Exception ex = null;
+    for (BatchGroup group: groups) {
+      try {
+        group.close();
+      } catch (Exception e) {
+        ex = (ex == null) ? e : ex;
+      }
+    }
+    if (ex != null) {
+      throw UserException.dataWriteError(ex)
+          .message("Failure while flushing spilled data")
+          .build(logger);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
new file mode 100644
index 0000000..8f1b5e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Represents the set of in-memory batches accumulated by
+ * the external sort.
+ */
+
+public class BufferedBatches {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferedBatches.class);
+
+  /**
+   * Incoming batches buffered in memory prior to spilling
+   * or an in-memory merge.
+   */
+
+  private LinkedList<BatchGroup.InputBatch> bufferedBatches = Lists.newLinkedList();
+
+  private final SorterWrapper sorterWrapper;
+
+  private BatchSchema schema;
+
+  private final OperExecContext context;
+
+  public BufferedBatches(OperExecContext opContext) {
+    context = opContext;
+    sorterWrapper = new SorterWrapper(opContext);
+  }
+
+  public void setSchema(BatchSchema schema) {
+    this.schema = schema;
+
+    // New schema: must generate a new sorter and copier.
+
+    sorterWrapper.close();
+
+    // Coerce all existing batches to the new schema.
+
+    for (BatchGroup b : bufferedBatches) {
+      b.setSchema(schema);
+    }
+  }
+
+  public int size() { return bufferedBatches.size(); }
+
+  @SuppressWarnings("resource")
+  public void add(VectorAccessible incoming, int batchSize) {
+    // Convert the incoming batch to the agreed-upon schema.
+    // No converted batch means we got an empty input batch.
+    // Converting the batch transfers memory ownership to our
+    // allocator. This gives a round-about way to learn the batch
+    // size: check the before and after memory levels, then use
+    // the difference as the batch size, in bytes.
+
+    VectorContainer convertedBatch = convertBatch(incoming);
+    if (convertedBatch == null) {
+      return;
+    }
+
+    SelectionVector2 sv2;
+    try {
+      sv2 = makeSelectionVector(incoming);
+    } catch (Exception e) {
+      convertedBatch.clear();
+      throw e;
+    }
+
+    // Sort the incoming batch using either the original selection vector,
+    // or a new one created here.
+
+    sorterWrapper.sortBatch(convertedBatch, sv2);
+    bufferBatch(convertedBatch, sv2, batchSize);
+  }
+  /**
+   * Convert an incoming batch into the agree-upon format.
+   * @param incoming
+   *
+   * @return the converted batch, or null if the incoming batch is empty
+   */
+
+  @SuppressWarnings("resource")
+  private VectorContainer convertBatch(VectorAccessible incoming) {
+
+    // Must accept the batch even if no records. Then clear
+    // the vectors to release memory since we won't do any
+    // further processing with the empty batch.
+
+    VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, context.getAllocator());
+    if (incoming.getRecordCount() == 0) {
+      for (VectorWrapper<?> w : convertedBatch) {
+        w.clear();
+      }
+      SelectionVector2 sv2 = incoming.getSelectionVector2();
+      if (sv2 != null) {
+        sv2.clear();
+      }
+      return null;
+    }
+    return convertedBatch;
+  }
+
+  private SelectionVector2 makeSelectionVector(VectorAccessible incoming) {
+    if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
+      return incoming.getSelectionVector2().clone();
+    } else {
+      return newSV2(incoming);
+    }
+  }
+
+  /**
+   * Allocate and initialize the selection vector used as the sort index.
+   * Assumes that memory is available for the vector since memory management
+   * ensured space is available.
+   *
+   * @return a new, populated selection vector 2
+   */
+
+  private SelectionVector2 newSV2(VectorAccessible incoming) {
+    SelectionVector2 sv2 = new SelectionVector2(context.getAllocator());
+    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
+      throw UserException.resourceError(new OutOfMemoryException("Unable to allocate sv2 buffer"))
+            .build(logger);
+    }
+    for (int i = 0; i < incoming.getRecordCount(); i++) {
+      sv2.setIndex(i, (char) i);
+    }
+    sv2.setRecordCount(incoming.getRecordCount());
+    return sv2;
+  }
+
+  @SuppressWarnings("resource")
+  private void bufferBatch(VectorContainer convertedBatch, SelectionVector2 sv2, int netSize) {
+    BufferAllocator allocator = context.getAllocator();
+    RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
+    try {
+      rbd.setSv2(sv2);
+      bufferedBatches.add(new BatchGroup.InputBatch(rbd.getContainer(), rbd.getSv2(), allocator, netSize));
+
+    } catch (Throwable t) {
+      rbd.clear();
+      throw t;
+    }
+  }
+
+  public List<BatchGroup> prepareSpill(long targetSpillSize) {
+
+    // Determine the number of batches to spill to create a spill file
+    // of the desired size. The actual file size might be a bit larger
+    // or smaller than the target, which is expected.
+
+    int spillCount = 0;
+    long spillSize = 0;
+    for (InputBatch batch : bufferedBatches) {
+      long batchSize = batch.getDataSize();
+      spillSize += batchSize;
+      spillCount++;
+      if (spillSize + batchSize / 2 > targetSpillSize) {
+        break; }
+    }
+
+    // Must always spill at least 2, even if this creates an over-size
+    // spill file. But, if this is a final consolidation, we may have only
+    // a single batch.
+
+    spillCount = Math.max(spillCount, 2);
+    spillCount = Math.min(spillCount, bufferedBatches.size());
+    return SpilledRuns.prepareSpillBatches(bufferedBatches, spillCount);
+  }
+
+  public List<BatchGroup.InputBatch> removeAll() {
+    List<BatchGroup.InputBatch> batches = new ArrayList<>( );
+    batches.addAll(bufferedBatches);
+    bufferedBatches.clear();
+    return batches;
+  }
+
+  public void close() {
+    // Use the spilled runs version. In-memory batches won't throw
+    // an error, but the API is generic.
+
+    RuntimeException ex = null;
+    try {
+      BatchGroup.closeAll(bufferedBatches);
+      bufferedBatches.clear();
+    } catch (RuntimeException e) {
+      ex = e;
+    }
+    try {
+      sorterWrapper.close();
+    } catch (RuntimeException e) {
+      ex = (ex == null) ? e : ex;
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
deleted file mode 100644
index c6b2dd9..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort.managed;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
-
-import com.google.common.base.Stopwatch;
-
-/**
- * Manages a {@link PriorityQueueCopier} instance produced from code generation.
- * Provides a wrapper around a copier "session" to simplify reading batches
- * from the copier.
- */
-
-public class CopierHolder {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierHolder.class);
-
-  private PriorityQueueCopier copier;
-
-  private final FragmentContext context;
-  private final BufferAllocator allocator;
-  private OperatorCodeGenerator opCodeGen;
-
-  public CopierHolder(FragmentContext context, BufferAllocator allocator, OperatorCodeGenerator opCodeGen) {
-    this.context = context;
-    this.allocator = allocator;
-    this.opCodeGen = opCodeGen;
-  }
-
-  /**
-   * Start a merge operation using a temporary vector container. Used for
-   * intermediate merges.
-   *
-   * @param schema
-   * @param batchGroupList
-   * @param targetRecordCount
-   * @return
-   */
-
-  public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, int targetRecordCount) {
-    return new BatchMerger(this, schema, batchGroupList, targetRecordCount);
-  }
-
-  /**
-   * Start a merge operation using the specified vector container. Used for
-   * the final merge operation.
-   *
-   * @param schema
-   * @param batchGroupList
-   * @param outputContainer
-   * @param targetRecordCount
-   * @return
-   */
-  public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int targetRecordCount) {
-    return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount);
-  }
-
-  /**
-   * Prepare a copier which will write a collection of vectors to disk. The copier
-   * uses generated code to do the actual writes. If the copier has not yet been
-   * created, generate code and create it. If it has been created, close it and
-   * prepare it for a new collection of batches.
-   *
-   * @param batch the (hyper) batch of vectors to be copied
-   * @param batchGroupList same batches as above, but represented as a list
-   * of individual batches
-   * @param outputContainer the container into which to copy the batches
-   */
-
-  @SuppressWarnings("unchecked")
-  private void createCopier(VectorAccessible batch, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer) {
-    if (copier != null) {
-      opCodeGen.closeCopier();
-    } else {
-      copier = opCodeGen.getCopier(batch);
-    }
-
-    // Initialize the value vectors for the output container
-
-    for (VectorWrapper<?> i : batch) {
-      @SuppressWarnings("resource")
-      ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
-      outputContainer.add(v);
-    }
-    try {
-      copier.setup(context, allocator, batch, (List<BatchGroup>) batchGroupList, outputContainer);
-    } catch (SchemaChangeException e) {
-      throw UserException.unsupportedError(e)
-            .message("Unexpected schema change - likely code error.")
-            .build(logger);
-    }
-  }
-
-  public BufferAllocator getAllocator() { return allocator; }
-
-  public void close() {
-    opCodeGen.closeCopier();
-    copier = null;
-  }
-
-  /**
-   * We've gathered a set of batches, each of which has been sorted. The batches
-   * may have passed through a filter and thus may have "holes" where rows have
-   * been filtered out. We will spill records in blocks of targetRecordCount.
-   * To prepare, copy that many records into an outputContainer as a set of
-   * contiguous values in new vectors. The result is a single batch with
-   * vectors that combine a collection of input batches up to the
-   * given threshold.
-   * <p>
-   * Input. Here the top line is a selection vector of indexes.
-   * The second line is a set of batch groups (separated by underscores)
-   * with letters indicating individual records:<pre>
-   * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
-   * [eh_ad_ibf]     [r_qm_kn_p]</pre>
-   * <p>
-   * Output, assuming blocks of 5 records. The brackets represent
-   * batches, the line represents the set of batches copied to the
-   * spill file.<pre>
-   * [abcde] [fhikm] [npqr]</pre>
-   * <p>
-   * The copying operation does a merge as well: copying
-   * values from the sources in ordered fashion. Consider a different example,
-   * we want to merge two input batches to produce a single output batch:
-   * <pre>
-   * Input:  [aceg] [bdfh]
-   * Output: [abcdefgh]</pre>
-   * <p>
-   * In the above, the input consists of two sorted batches. (In reality,
-   * the input batches have an associated selection vector, but that is omitted
-   * here and just the sorted values shown.) The output is a single batch
-   * with the merged records (indicated by letters) from the two input batches.
-   * <p>
-   * Here we bind the copier to the batchGroupList of sorted, buffered batches
-   * to be merged. We bind the copier output to outputContainer: the copier will write its
-   * merged "batches" of records to that container.
-   * <p>
-   * Calls to the {@link #next()} method sequentially return merged batches
-   * of the desired row count.
-    */
-
-  public static class BatchMerger implements SortResults, AutoCloseable {
-
-    private CopierHolder holder;
-    private VectorContainer hyperBatch;
-    private VectorContainer outputContainer;
-    private int targetRecordCount;
-    private int copyCount;
-    private int batchCount;
-    private long estBatchSize;
-
-    /**
-     * Creates a merger with an temporary output container.
-     *
-     * @param holder the copier that does the work
-     * @param schema schema for the input and output batches
-     * @param batchGroupList the input batches
-     * @param targetRecordCount number of records for each output batch
-     */
-    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
-                        int targetRecordCount) {
-      this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount);
-    }
-
-    /**
-     * Creates a merger with the specified output container
-     *
-     * @param holder the copier that does the work
-     * @param schema schema for the input and output batches
-     * @param batchGroupList the input batches
-     * @param outputContainer merges output batch into the given output container
-     * @param targetRecordCount number of records for each output batch
-     */
-    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
-                        VectorContainer outputContainer, int targetRecordCount) {
-      this.holder = holder;
-      hyperBatch = constructHyperBatch(schema, batchGroupList);
-      copyCount = 0;
-      this.targetRecordCount = targetRecordCount;
-      this.outputContainer = outputContainer;
-      holder.createCopier(hyperBatch, batchGroupList, outputContainer);
-    }
-
-    /**
-     * Return the output container.
-     *
-     * @return the output container
-     */
-    public VectorContainer getOutput() {
-      return outputContainer;
-    }
-
-    /**
-     * Read the next merged batch. The batch holds the specified row count, but
-     * may be less if this is the last batch.
-     *
-     * @return the number of rows in the batch, or 0 if no more batches
-     * are available
-     */
-
-    @Override
-    public boolean next() {
-      Stopwatch w = Stopwatch.createStarted();
-      long start = holder.allocator.getAllocatedMemory();
-      int count = holder.copier.next(targetRecordCount);
-      copyCount += count;
-      if (count > 0) {
-        long t = w.elapsed(TimeUnit.MICROSECONDS);
-        batchCount++;
-        logger.trace("Took {} us to merge {} records", t, count);
-        long size = holder.allocator.getAllocatedMemory() - start;
-        estBatchSize = Math.max(estBatchSize, size);
-      } else {
-        logger.trace("copier returned 0 records");
-      }
-
-      // Identify the schema to be used in the output container. (Since
-      // all merged batches have the same schema, the schema we identify
-      // here should be the same as that which we already had.
-
-      outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
-      // The copier does not set the record count in the output
-      // container, so do that here.
-
-      outputContainer.setRecordCount(count);
-
-      return count > 0;
-    }
-
-    /**
-     * Construct a vector container that holds a list of batches, each represented as an
-     * array of vectors. The entire collection of vectors has a common schema.
-     * <p>
-     * To build the collection, we go through the current schema (which has been
-     * devised to be common for all batches.) For each field in the schema, we create
-     * an array of vectors. To create the elements, we iterate over all the incoming
-     * batches and search for the vector that matches the current column.
-     * <p>
-     * Finally, we build a new schema for the combined container. That new schema must,
-     * because of the way the container was created, match the current schema.
-     *
-     * @param schema schema for the hyper batch
-     * @param batchGroupList list of batches to combine
-     * @return a container where each column is represented as an array of vectors
-     * (hence the "hyper" in the method name)
-     */
-
-    private VectorContainer constructHyperBatch(BatchSchema schema, List<? extends BatchGroup> batchGroupList) {
-      VectorContainer cont = new VectorContainer();
-      for (MaterializedField field : schema) {
-        ValueVector[] vectors = new ValueVector[batchGroupList.size()];
-        int i = 0;
-        for (BatchGroup group : batchGroupList) {
-          vectors[i++] = group.getValueAccessorById(
-              field.getValueClass(),
-              group.getValueVectorId(SchemaPath.getSimplePath(field.getPath())).getFieldIds())
-              .getValueVector();
-        }
-        cont.add(vectors);
-      }
-      cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
-      return cont;
-    }
-
-    @Override
-    public void close() {
-      hyperBatch.clear();
-      holder.close();
-    }
-
-    @Override
-    public int getRecordCount() {
-      return copyCount;
-    }
-
-    @Override
-    public int getBatchCount() {
-      return batchCount;
-    }
-
-    /**
-     * Gets the estimated batch size, in bytes. Use for estimating the memory
-     * needed to process the batches that this operator created.
-     * @return the size of the largest batch created by this operation,
-     * in bytes
-     */
-
-    public long getEstBatchSize() {
-      return estBatchSize;
-    }
-  }
-}