You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:10 UTC

[2/5] drill git commit: DRILL-5325: Unit tests for the managed sort

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
new file mode 100644
index 0000000..363c08c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBatchSerialization extends DrillTest {
+
+  public static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    fixture = OperatorFixture.builder().build();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    fixture.close();
+  }
+
+  public SingleRowSet makeRowSet(BatchSchema schema, int rowCount) {
+    ExtendableRowSet rowSet = fixture.rowSet(schema);
+    RowSetWriter writer = rowSet.writer(rowCount);
+    for (int i = 0; i < rowCount; i++) {
+      RowSetUtilities.setFromInt(writer, 0, i);
+      writer.save();
+    }
+    writer.done();
+    return rowSet;
+  }
+
+  public SingleRowSet makeNullableRowSet(BatchSchema schema, int rowCount) {
+    ExtendableRowSet rowSet = fixture.rowSet(schema);
+    RowSetWriter writer = rowSet.writer(rowCount);
+    for (int i = 0; i < rowCount; i++) {
+      if (i % 2 == 0) {
+        RowSetUtilities.setFromInt(writer, 0, i);
+      } else {
+        writer.column(0).setNull();
+      }
+      writer.save();
+    }
+    writer.done();
+    return rowSet;
+  }
+
+  public void testType(MinorType type) throws IOException {
+    testNonNullType(type);
+    testNullableType(type);
+  }
+
+  public void testNonNullType(MinorType type) throws IOException {
+    BatchSchema schema = new SchemaBuilder( )
+        .add("col", type)
+        .build();
+    int rowCount = 20;
+    verifySerialize(makeRowSet(schema, rowCount),
+                    makeRowSet(schema, rowCount));
+  }
+
+  public void testNullableType(MinorType type) throws IOException {
+    BatchSchema schema = new SchemaBuilder( )
+        .addNullable("col", type)
+        .build();
+    int rowCount = 20;
+    verifySerialize(makeNullableRowSet(schema, rowCount),
+                    makeNullableRowSet(schema, rowCount));
+  }
+
+  /**
+   * Verify serialize and deserialize. Need to pass both the
+   * input and expected (even though the expected should be the same
+   * data as the input) because the act of serializing clears the
+   * input for obscure historical reasons.
+   *
+   * @param rowSet
+   * @param expected
+   * @throws IOException
+   */
+  private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
+
+    long origSize = rowSet.size();
+
+    File dir = OperatorFixture.getTempDir("serial");
+    File outFile = new File(dir, "serialze.dat");
+    try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
+      VectorSerializer.writer(fixture.allocator(), out)
+        .write(rowSet.container(), rowSet.getSv2());
+    }
+
+    RowSet result;
+    try (InputStream in = new BufferedInputStream(new FileInputStream(outFile))) {
+      result = fixture.wrap(
+        VectorSerializer.reader(fixture.allocator(), in)
+          .read());
+    }
+
+    assertTrue(origSize >= result.size());
+    new RowSetComparison(expected)
+      .verifyAndClear(result);
+    outFile.delete();
+  }
+
+  @Test
+  public void testTypes() throws IOException {
+    testType(MinorType.TINYINT);
+    testType(MinorType.UINT1);
+    testType(MinorType.SMALLINT);
+    testType(MinorType.UINT2);
+    testType(MinorType.INT);
+    testType(MinorType.UINT4);
+    testType(MinorType.BIGINT);
+    testType(MinorType.UINT8);
+    testType(MinorType.FLOAT4);
+    testType(MinorType.FLOAT8);
+    testType(MinorType.DECIMAL9);
+    testType(MinorType.DECIMAL18);
+    testType(MinorType.DECIMAL28SPARSE);
+    testType(MinorType.DECIMAL38SPARSE);
+//  testType(MinorType.DECIMAL28DENSE); No writer
+//  testType(MinorType.DECIMAL38DENSE); No writer
+    testType(MinorType.DATE);
+    testType(MinorType.TIME);
+    testType(MinorType.TIMESTAMP);
+    testType(MinorType.INTERVAL);
+    testType(MinorType.INTERVALYEAR);
+    testType(MinorType.INTERVALDAY);
+  }
+
+  private SingleRowSet buildMapSet(BatchSchema schema) {
+    return fixture.rowSetBuilder(schema)
+        .add(1, 100, "first")
+        .add(2, 200, "second")
+        .add(3, 300, "third")
+        .build();
+  }
+
+  private SingleRowSet buildArraySet(BatchSchema schema) {
+    return fixture.rowSetBuilder(schema)
+        .add(1, new String[] { "first, second, third" } )
+        .add(2, null)
+        .add(3, new String[] { "third, fourth, fifth" } )
+        .build();
+  }
+
+  /**
+   * Tests a map type and an SV2.
+   *
+   * @throws IOException
+   */
+
+  @Test
+  public void testMap() throws IOException {
+    BatchSchema schema = new SchemaBuilder()
+        .add("top", MinorType.INT)
+        .addMap("map")
+          .add("key", MinorType.INT)
+          .add("value", MinorType.VARCHAR)
+          .buildMap()
+        .build();
+
+    verifySerialize(buildMapSet(schema).toIndirect(),
+                    buildMapSet(schema));
+  }
+
+  @Test
+  public void testArray() throws IOException {
+    BatchSchema schema = new SchemaBuilder()
+        .add("top", MinorType.INT)
+        .addArray("arr", MinorType.VARCHAR)
+        .build();
+
+    verifySerialize(buildArraySet(schema).toIndirect(),
+                    buildArraySet(schema));
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index 96dae6a..2473dc5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -91,6 +91,7 @@ public class TestWriteToDisk extends ExecTest {
         VectorContainer container = new VectorContainer();
         container.addCollection(vectorList);
         container.setRecordCount(4);
+        @SuppressWarnings("resource")
         WritableBatch batch = WritableBatch.getBatchNoHVWrap(
             container.getRecordCount(), container, false);
         VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(
@@ -107,7 +108,6 @@ public class TestWriteToDisk extends ExecTest {
           final Path path = new Path(tempDir.getAbsolutePath(), "drillSerializable");
           try (final FSDataOutputStream out = fs.create(path)) {
             wrap.writeToStream(out);
-            out.close();
           }
 
           try (final FSDataInputStream in = fs.open(path)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
index 52ebd57..6ead748 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
@@ -20,13 +20,16 @@ package org.apache.drill.exec.physical.impl.xsort;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.TestBuilder;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.SecondaryTest;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 
+@Category(SecondaryTest.class)
 public class TestExternalSort extends BaseTestQuery {
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index 50bf710..f643d5f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,7 +35,6 @@ import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.FixtureBuilder;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
@@ -46,6 +45,11 @@ public class TestSimpleExternalSort extends DrillTest {
   @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
 
   @Test
+  public void mergeSortWithSv2Managed() throws Exception {
+    mergeSortWithSv2(false);
+  }
+
+  @Test
   public void mergeSortWithSv2Legacy() throws Exception {
     mergeSortWithSv2(true);
   }
@@ -62,26 +66,37 @@ public class TestSimpleExternalSort extends DrillTest {
    */
 
   private void mergeSortWithSv2(boolean testLegacy) throws Exception {
-    try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+    FixtureBuilder builder = ClusterFixture.builder()
+        .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+         ;
+    try (ClusterFixture cluster = builder.build();
          ClientFixture client = cluster.clientFixture()) {
       chooseImpl(client, testLegacy);
       List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending_sv2.json").results();
-      assertEquals(500000, client.countResults( results ));
+      assertEquals(500_000, client.countResults(results));
       validateResults(client.allocator(), results);
     }
   }
 
   private void chooseImpl(ClientFixture client, boolean testLegacy) throws Exception {
+    client.alterSession(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName(), testLegacy);
+  }
+
+  @Test
+  public void sortOneKeyDescendingMergeSortManaged() throws Throwable {
+    sortOneKeyDescendingMergeSort(false);
   }
 
   @Test
-  @Ignore
   public void sortOneKeyDescendingMergeSortLegacy() throws Throwable {
     sortOneKeyDescendingMergeSort(true);
   }
 
   private void sortOneKeyDescendingMergeSort(boolean testLegacy) throws Throwable {
-    try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+    FixtureBuilder builder = ClusterFixture.builder()
+        .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+         ;
+    try (ClusterFixture cluster = builder.build();
          ClientFixture client = cluster.clientFixture()) {
       chooseImpl(client, testLegacy);
       List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results();
@@ -101,7 +116,7 @@ public class TestSimpleExternalSort extends DrillTest {
       if (b.getHeader().getRowCount() > 0) {
         batchCount++;
         loader.load(b.getHeader().getDef(),b.getData());
-        @SuppressWarnings("resource")
+        @SuppressWarnings({ "deprecation", "resource" })
         BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
         BigIntVector.Accessor a1 = c1.getAccessor();
 
@@ -118,44 +133,56 @@ public class TestSimpleExternalSort extends DrillTest {
     System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
   }
 
+  @Test
+  public void sortOneKeyDescendingExternalSortManaged() throws Throwable {
+    sortOneKeyDescendingExternalSort(false);
+  }
 
   @Test
-  @Ignore
   public void sortOneKeyDescendingExternalSortLegacy() throws Throwable {
     sortOneKeyDescendingExternalSort(true);
   }
 
   private void sortOneKeyDescendingExternalSort(boolean testLegacy) throws Throwable {
-    FixtureBuilder builder = ClusterFixture.builder( )
-        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4 )
-        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4);
+    FixtureBuilder builder = ClusterFixture.builder()
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4)
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4)
+        .configProperty(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 4)
+        .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+        ;
     try (ClusterFixture cluster = builder.build();
-        ClientFixture client = cluster.clientFixture()) {
+         ClientFixture client = cluster.clientFixture()) {
       chooseImpl(client,testLegacy);
       List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/one_key_sort_descending.json").results();
-      assertEquals(1000000, client.countResults( results ));
+      assertEquals(1_000_000, client.countResults(results));
       validateResults(client.allocator(), results);
     }
   }
 
-  @Ignore
+  @Test
+  public void outOfMemoryExternalSortManaged() throws Throwable{
+    outOfMemoryExternalSort(false);
+  }
+
   @Test
   public void outOfMemoryExternalSortLegacy() throws Throwable{
     outOfMemoryExternalSort(true);
   }
 
   private void outOfMemoryExternalSort(boolean testLegacy) throws Throwable{
-    FixtureBuilder builder = ClusterFixture.builder( )
+    FixtureBuilder builder = ClusterFixture.builder()
         // Probably do nothing in modern Drill
-        .configProperty( "drill.memory.fragment.max", 50000000 )
-        .configProperty( "drill.memory.fragment.initial", 2000000 )
-        .configProperty( "drill.memory.operator.max", 30000000 )
-        .configProperty( "drill.memory.operator.initial", 2000000 );
+        .configProperty("drill.memory.fragment.max", 50_000_000)
+        .configProperty("drill.memory.fragment.initial", 2_000_000)
+        .configProperty("drill.memory.operator.max", 30_000_000)
+        .configProperty("drill.memory.operator.initial", 2_000_000)
+        .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, testLegacy)
+        ;
     try (ClusterFixture cluster = builder.build();
-        ClientFixture client = cluster.clientFixture()) {
+         ClientFixture client = cluster.clientFixture()) {
       chooseImpl(client,testLegacy);
       List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/oom_sort_test.json").results();
-      assertEquals(10000000, client.countResults( results ));
+      assertEquals(10_000_000, client.countResults(results));
 
       long previousBigInt = Long.MAX_VALUE;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
index 788caf7..5a1bf6d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,22 +17,23 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.config.DrillConfig;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.apache.drill.exec.testing.Controls;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Properties;
 
 /**
  * Testing External Sort's spilling to disk.
@@ -42,30 +43,33 @@ import java.util.Properties;
  * <br>
  * {@link ExecConstants#EXTERNAL_SORT_SPILL_GROUP_SIZE} = 1
  */
-public class TestSortSpillWithException extends BaseTestQuery {
+public class TestSortSpillWithException extends ClusterTest {
   private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";
 
   @BeforeClass
-  public static void initCluster() {
-    // make sure memory sorter outputs 20 rows per batch
-    final Properties props = cloneDefaultTestConfigProperties();
-    props.put(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, "1");
-    props.put(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, "1");
-
-    updateTestCluster(1, DrillConfig.create(props));
+  public static void setup() throws Exception {
+    FixtureBuilder builder = ClusterFixture.builder()
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 1) // Unmanaged
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 1) // Unmanaged
+        .sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 60 * 1024 * 1024) // Spill early
+        .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false)
+        .maxParallelization(1)
+        ;
+    startCluster(builder);
   }
 
   @Test
-  public void testSpilLeak() throws Exception {
+  public void testSpillLeakLegacy() throws Exception {
+    client.alterSession(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName(), true);
     // inject exception in sort while spilling
     final String controls = Controls.newBuilder()
       .addExceptionOnBit(
-          ExternalSortBatch.class,
-          ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+          org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class,
+          org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
           IOException.class,
-          bits[0].getContext().getEndpoint())
+          cluster.drillbit().getContext().getEndpoint())
       .build();
-    ControlsInjectionUtil.setControls(client, controls);
+    ControlsInjectionUtil.setControls(cluster.client(), controls);
     // run a simple order by query
     try {
       test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
@@ -76,4 +80,28 @@ public class TestSortSpillWithException extends BaseTestQuery {
         e.getMessage().contains("External Sort encountered an error while spilling to disk"));
     }
   }
+
+  @Test
+  public void testSpillLeakManaged() throws Exception {
+    client.alterSession(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName(), false);
+    // inject exception in sort while spilling
+    final String controls = Controls.newBuilder()
+      .addExceptionOnBit(
+          org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.class,
+          org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+          IOException.class,
+          cluster.drillbit().getContext().getEndpoint())
+      .build();
+    ControlsInjectionUtil.setControls(cluster.client(), controls);
+    // run a simple order by query
+    try {
+      test("SELECT id_i, name_s250 FROM `mock`.`employee_500K` ORDER BY id_i");
+//      test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
+      fail("Query should have failed!");
+    } catch (UserRemoteException e) {
+      assertEquals(ErrorType.RESOURCE, e.getErrorType());
+      assertTrue("Incorrect error message",
+        e.getMessage().contains("External Sort encountered an error while spilling to disk"));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
new file mode 100644
index 0000000..034da2c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetSchema;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+
+import com.google.common.collect.Lists;
+
+public class SortTestUtilities {
+
+  private SortTestUtilities() { }
+
+  public static BatchSchema makeSchema(MinorType type, boolean nullable) {
+    return new SchemaBuilder()
+        .add("key", type, nullable ? DataMode.OPTIONAL : DataMode.REQUIRED)
+        .add("value", MinorType.VARCHAR)
+        .build();
+  }
+
+  public static BatchSchema nonNullSchema() {
+    return makeSchema(MinorType.INT, false);
+  }
+
+  public static BatchSchema nullableSchema() {
+    return makeSchema(MinorType.INT, true);
+  }
+
+  public static PriorityQueueCopierWrapper makeCopier(OperatorFixture fixture, String sortOrder, String nullOrder) {
+    FieldReference expr = FieldReference.getWithQuotedRef("key");
+    Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
+    Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
+    OperExecContext opContext = fixture.newOperExecContext(popConfig);
+    return new PriorityQueueCopierWrapper(opContext);
+  }
+
+  public static class CopierTester {
+    List<SingleRowSet> rowSets = new ArrayList<>();
+    List<SingleRowSet> expected = new ArrayList<>();
+    String sortOrder = Ordering.ORDER_ASC;
+    String nullOrder = Ordering.NULLS_UNSPECIFIED;
+    private OperatorFixture fixture;
+
+    public CopierTester(OperatorFixture fixture) {
+      this.fixture = fixture;
+    }
+
+    public void addInput(SingleRowSet input) {
+      rowSets.add(input);
+    }
+
+    public void addOutput(SingleRowSet output) {
+      expected.add(output);
+    }
+
+    public void run() throws Exception {
+      PriorityQueueCopierWrapper copier = makeCopier(fixture, sortOrder, nullOrder);
+      List<BatchGroup> batches = new ArrayList<>();
+      RowSetSchema schema = null;
+      for (SingleRowSet rowSet : rowSets) {
+        batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(),
+                    fixture.allocator(), rowSet.size()));
+        if (schema == null) {
+          schema = rowSet.schema();
+        }
+      }
+      int rowCount = outputRowCount();
+      VectorContainer dest = new VectorContainer();
+      @SuppressWarnings("resource")
+      BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE),
+                                             batches, dest, rowCount);
+
+      verifyResults(merger, dest);
+      dest.clear();
+      merger.close();
+    }
+
+    public int outputRowCount() {
+      if (! expected.isEmpty()) {
+        return expected.get(0).rowCount();
+      }
+      return 10;
+    }
+
+    protected void verifyResults(BatchMerger merger, VectorContainer dest) {
+      for (RowSet expectedSet : expected) {
+        assertTrue(merger.next());
+        RowSet rowSet = new DirectRowSet(fixture.allocator(), dest);
+        new RowSetComparison(expectedSet)
+              .verifyAndClear(rowSet);
+      }
+      assertFalse(merger.next());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
new file mode 100644
index 0000000..0050747
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
+import org.apache.drill.test.rowSet.RowSet.RowSetWriter;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.SchemaBuilder;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Light-weight sanity test of the copier class. The implementation has
+ * been used in production, so the tests here just check for the obvious
+ * cases.
+ * <p>
+ * Note, however, that if significant changes are made to the copier,
+ * then additional tests should be added to re-validate the code.
+ */
+
+public class TestCopier extends DrillTest {
+
+  public static OperatorFixture fixture;
+
+  @BeforeClass
+  public static void setup() {
+    fixture = OperatorFixture.builder().build();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    fixture.close();
+  }
+
+  @Test
+  public void testEmptyInput() throws Exception {
+    BatchSchema schema = SortTestUtilities.nonNullSchema();
+    List<BatchGroup> batches = new ArrayList<>();
+    PriorityQueueCopierWrapper copier = SortTestUtilities.makeCopier(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
+    VectorContainer dest = new VectorContainer();
+    try {
+      @SuppressWarnings({ "resource", "unused" })
+      BatchMerger merger = copier.startMerge(schema, batches, dest, 10);
+      fail();
+    } catch (AssertionError e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testEmptyBatch() throws Exception {
+    BatchSchema schema = SortTestUtilities.nonNullSchema();
+    CopierTester tester = new CopierTester(fixture);
+    tester.addInput(fixture.rowSetBuilder(schema)
+          .withSv2()
+          .build());
+
+    tester.run();
+  }
+
+  @Test
+  public void testSingleRow() throws Exception {
+    BatchSchema schema = SortTestUtilities.nonNullSchema();
+    CopierTester tester = new CopierTester(fixture);
+    tester.addInput(fixture.rowSetBuilder(schema)
+          .add(10, "10")
+          .withSv2()
+          .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+          .add(10, "10")
+          .build());
+    tester.run();
+  }
+
+  @Test
+  public void testTwoBatchesSingleRow() throws Exception {
+    BatchSchema schema = SortTestUtilities.nonNullSchema();
+    CopierTester tester = new CopierTester(fixture);
+    tester.addInput(fixture.rowSetBuilder(schema)
+          .add(10, "10")
+          .withSv2()
+          .build());
+    tester.addInput(fixture.rowSetBuilder(schema)
+          .add(20, "20")
+          .withSv2()
+          .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+          .add(10, "10")
+          .add(20, "20")
+          .build());
+    tester.run();
+  }
+
+  public static SingleRowSet makeDataSet(BatchSchema schema, int first, int step, int count) {
+    ExtendableRowSet rowSet = fixture.rowSet(schema);
+    RowSetWriter writer = rowSet.writer(count);
+    int value = first;
+    for (int i = 0; i < count; i++, value += step) {
+      RowSetUtilities.setFromInt(writer, 0, value);
+      writer.column(1).setString(Integer.toString(value));
+      writer.save();
+    }
+    writer.done();
+    return rowSet;
+  }
+
+  @Test
+  public void testMultipleOutput() throws Exception {
+    BatchSchema schema = SortTestUtilities.nonNullSchema();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.addInput(makeDataSet(schema, 0, 2, 10).toIndirect());
+    tester.addInput(makeDataSet(schema, 1, 2, 10).toIndirect());
+
+    tester.addOutput(makeDataSet(schema, 0, 1, 10));
+    tester.addOutput(makeDataSet(schema, 10, 1, 10));
+    tester.run();
+  }
+
+  // Also verifies that SV2s work
+
+  @Test
+  public void testMultipleOutputDesc() throws Exception {
+    BatchSchema schema = SortTestUtilities.nonNullSchema();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.sortOrder = Ordering.ORDER_DESC;
+    tester.nullOrder = Ordering.NULLS_UNSPECIFIED;
+    SingleRowSet input = makeDataSet(schema, 0, 2, 10).toIndirect();
+    RowSetUtilities.reverse(input.getSv2());
+    tester.addInput(input);
+
+    input = makeDataSet(schema, 1, 2, 10).toIndirect();
+    RowSetUtilities.reverse(input.getSv2());
+    tester.addInput(input);
+
+    tester.addOutput(makeDataSet(schema, 19, -1, 10));
+    tester.addOutput(makeDataSet(schema, 9, -1, 10));
+
+    tester.run();
+  }
+
+  @Test
+  public void testAscNullsLast() throws Exception {
+    BatchSchema schema = SortTestUtilities.nullableSchema();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.sortOrder = Ordering.ORDER_ASC;
+    tester.nullOrder = Ordering.NULLS_LAST;
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(1, "1")
+        .add(4, "4")
+        .add(null, "null")
+        .withSv2()
+        .build());
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(2, "2")
+        .add(3, "3")
+        .add(null, "null")
+        .withSv2()
+        .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+        .add(1, "1")
+        .add(2, "2")
+        .add(3, "3")
+        .add(4, "4")
+        .add(null, "null")
+        .add(null, "null")
+        .build());
+
+    tester.run();
+  }
+
+  @Test
+  public void testAscNullsFirst() throws Exception {
+    BatchSchema schema = SortTestUtilities.nullableSchema();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.sortOrder = Ordering.ORDER_ASC;
+    tester.nullOrder = Ordering.NULLS_FIRST;
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(null, "null")
+        .add(1, "1")
+        .add(4, "4")
+        .withSv2()
+        .build());
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(null, "null")
+        .add(2, "2")
+        .add(3, "3")
+        .withSv2()
+        .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+        .add(null, "null")
+        .add(null, "null")
+        .add(1, "1")
+        .add(2, "2")
+        .add(3, "3")
+        .add(4, "4")
+        .build());
+
+    tester.run();
+  }
+
+  @Test
+  public void testDescNullsLast() throws Exception {
+    BatchSchema schema = SortTestUtilities.nullableSchema();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.sortOrder = Ordering.ORDER_DESC;
+    tester.nullOrder = Ordering.NULLS_LAST;
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(4, "4")
+        .add(1, "1")
+        .add(null, "null")
+        .withSv2()
+        .build());
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(3, "3")
+        .add(2, "2")
+        .add(null, "null")
+        .withSv2()
+        .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+        .add(4, "4")
+        .add(3, "3")
+        .add(2, "2")
+        .add(1, "1")
+        .add(null, "null")
+        .add(null, "null")
+        .build());
+
+    tester.run();
+  }
+
+  @Test
+  public void testDescNullsFirst() throws Exception {
+    BatchSchema schema = SortTestUtilities.nullableSchema();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.sortOrder = Ordering.ORDER_DESC;
+    tester.nullOrder = Ordering.NULLS_FIRST;
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(null, "null")
+        .add(4, "4")
+        .add(1, "1")
+        .withSv2()
+        .build());
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(null, "null")
+        .add(3, "3")
+        .add(2, "2")
+        .withSv2()
+        .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+        .add(null, "null")
+        .add(null, "null")
+        .add(4, "4")
+        .add(3, "3")
+        .add(2, "2")
+        .add(1, "1")
+        .build());
+
+    tester.run();
+  }
+
+  public static void runTypeTest(OperatorFixture fixture, MinorType type) throws Exception {
+    BatchSchema schema = SortTestUtilities.makeSchema(type, false);
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.addInput(makeDataSet(schema, 0, 2, 5).toIndirect());
+    tester.addInput(makeDataSet(schema, 1, 2, 5).toIndirect());
+
+    tester.addOutput(makeDataSet(schema, 0, 1, 10));
+
+    tester.run();
+  }
+
+  @Test
+  public void testTypes() throws Exception {
+    testAllTypes(fixture);
+  }
+
+  public static void testAllTypes(OperatorFixture fixture) throws Exception {
+    runTypeTest(fixture, MinorType.INT);
+    runTypeTest(fixture, MinorType.BIGINT);
+    runTypeTest(fixture, MinorType.FLOAT4);
+    runTypeTest(fixture, MinorType.FLOAT8);
+    runTypeTest(fixture, MinorType.DECIMAL9);
+    runTypeTest(fixture, MinorType.DECIMAL18);
+    runTypeTest(fixture, MinorType.VARCHAR);
+    runTypeTest(fixture, MinorType.VARBINARY);
+    runTypeTest(fixture, MinorType.DATE);
+    runTypeTest(fixture, MinorType.TIME);
+    runTypeTest(fixture, MinorType.TIMESTAMP);
+    runTypeTest(fixture, MinorType.INTERVAL);
+    runTypeTest(fixture, MinorType.INTERVALDAY);
+    runTypeTest(fixture, MinorType.INTERVALYEAR);
+
+    // Others not tested. See DRILL-5329
+  }
+
+  @Test
+  public void testMapType() throws Exception {
+    testMapType(fixture);
+  }
+
+  public void testMapType(OperatorFixture fixture) throws Exception {
+    BatchSchema schema = new SchemaBuilder()
+        .add("key", MinorType.INT)
+        .addMap("m1")
+          .add("b", MinorType.INT)
+          .addMap("m2")
+            .add("c", MinorType.INT)
+            .buildMap()
+          .buildMap()
+        .build();
+
+    CopierTester tester = new CopierTester(fixture);
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(1, 10, 100)
+        .add(5, 50, 500)
+        .withSv2()
+        .build());
+
+    tester.addInput(fixture.rowSetBuilder(schema)
+        .add(2, 20, 200)
+        .add(6, 60, 600)
+        .withSv2()
+        .build());
+
+    tester.addOutput(fixture.rowSetBuilder(schema)
+        .add(1, 10, 100)
+        .add(2, 20, 200)
+        .add(5, 50, 500)
+        .add(6, 60, 600)
+        .build());
+
+    tester.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java
new file mode 100644
index 0000000..f5858a3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortExec.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.test.DrillTest;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestExternalSortExec extends DrillTest {
+
+  @Test
+  public void testFieldReference() {
+    // Misnomer: the reference must be unquoted.
+    FieldReference expr = FieldReference.getWithQuotedRef("foo");
+    assertEquals(Types.LATE_BIND_TYPE, expr.getMajorType());
+    assertTrue(expr.isSimplePath());
+    assertEquals("foo", expr.getRootSegment().getPath());
+    assertEquals("`foo`", expr.toExpr());
+  }
+
+  @Test
+  public void testOrdering() {
+    assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(null));
+    assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASC));
+    assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESC));
+    assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASCENDING));
+    assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESCENDING));
+    assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASC.toLowerCase()));
+    assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESC.toLowerCase()));
+    assertEquals(Direction.ASCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_ASCENDING.toLowerCase()));
+    assertEquals(Direction.DESCENDING, Ordering.getOrderingSpecFromString(Ordering.ORDER_DESCENDING.toLowerCase()));
+    try {
+      Ordering.getOrderingSpecFromString("");
+      fail();
+    } catch(DrillRuntimeException e) { }
+    try {
+      Ordering.getOrderingSpecFromString("foo");
+      fail();
+    } catch(DrillRuntimeException e) { }
+
+    assertEquals(NullDirection.UNSPECIFIED, Ordering.getNullOrderingFromString(null));
+    assertEquals(NullDirection.FIRST, Ordering.getNullOrderingFromString(Ordering.NULLS_FIRST));
+    assertEquals(NullDirection.LAST, Ordering.getNullOrderingFromString(Ordering.NULLS_LAST));
+    assertEquals(NullDirection.UNSPECIFIED, Ordering.getNullOrderingFromString(Ordering.NULLS_UNSPECIFIED));
+    assertEquals(NullDirection.FIRST, Ordering.getNullOrderingFromString(Ordering.NULLS_FIRST.toLowerCase()));
+    assertEquals(NullDirection.LAST, Ordering.getNullOrderingFromString(Ordering.NULLS_LAST.toLowerCase()));
+    assertEquals(NullDirection.UNSPECIFIED, Ordering.getNullOrderingFromString(Ordering.NULLS_UNSPECIFIED.toLowerCase()));
+    try {
+      Ordering.getNullOrderingFromString("");
+      fail();
+    } catch(DrillRuntimeException e) { }
+    try {
+      Ordering.getNullOrderingFromString("foo");
+      fail();
+    } catch(DrillRuntimeException e) { }
+
+    FieldReference expr = FieldReference.getWithQuotedRef("foo");
+
+    // Test all getters
+
+    Ordering ordering = new Ordering((String) null, expr, (String) null);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+    assertSame(expr, ordering.getExpr());
+    assertTrue(ordering.nullsSortHigh());
+
+    // Test all ordering strings
+
+    ordering = new Ordering((String) Ordering.ORDER_ASC, expr, (String) null);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+    ordering = new Ordering((String) Ordering.ORDER_ASC.toLowerCase(), expr, (String) null);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+    ordering = new Ordering((String) Ordering.ORDER_ASCENDING, expr, (String) null);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+    ordering = new Ordering((String) Ordering.ORDER_DESC, expr, (String) null);
+    assertEquals(Direction.DESCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+    ordering = new Ordering((String) Ordering.ORDER_DESCENDING, expr, (String) null);
+    assertEquals(Direction.DESCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+
+    // Test all null ordering strings
+
+    ordering = new Ordering((String) null, expr, Ordering.NULLS_FIRST);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.FIRST, ordering.getNullDirection());
+    assertFalse(ordering.nullsSortHigh());
+
+    ordering = new Ordering((String) null, expr, Ordering.NULLS_FIRST);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.FIRST, ordering.getNullDirection());
+    assertFalse(ordering.nullsSortHigh());
+
+    ordering = new Ordering((String) null, expr, Ordering.NULLS_LAST);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.LAST, ordering.getNullDirection());
+    assertTrue(ordering.nullsSortHigh());
+
+    ordering = new Ordering((String) null, expr, Ordering.NULLS_UNSPECIFIED);
+    assertEquals(Direction.ASCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+    assertTrue(ordering.nullsSortHigh());
+
+    // Unspecified order is always nulls high
+
+    ordering = new Ordering(Ordering.ORDER_DESC, expr, Ordering.NULLS_UNSPECIFIED);
+    assertEquals(Direction.DESCENDING, ordering.getDirection());
+    assertEquals(NullDirection.UNSPECIFIED, ordering.getNullDirection());
+    assertTrue(ordering.nullsSortHigh());
+
+    // Null sort direction reverses with a Desc sort.
+
+    ordering = new Ordering(Ordering.ORDER_DESC, expr, Ordering.NULLS_FIRST);
+    assertEquals(Direction.DESCENDING, ordering.getDirection());
+    assertEquals(NullDirection.FIRST, ordering.getNullDirection());
+    assertTrue(ordering.nullsSortHigh());
+
+    ordering = new Ordering(Ordering.ORDER_DESC, expr, Ordering.NULLS_LAST);
+    assertEquals(Direction.DESCENDING, ordering.getDirection());
+    assertEquals(NullDirection.LAST, ordering.getNullDirection());
+    assertFalse(ordering.nullsSortHigh());
+  }
+
+  @Test
+  public void testSortSpec() {
+    FieldReference expr = FieldReference.getWithQuotedRef("foo");
+    Ordering ordering = new Ordering(Ordering.ORDER_ASC, expr, Ordering.NULLS_FIRST);
+
+    // Basics
+
+    ExternalSort popConfig = new ExternalSort(null, Lists.newArrayList(ordering), false);
+    assertSame(ordering, popConfig.getOrderings().get(0));
+    assertFalse(popConfig.getReverse());
+    assertEquals(SelectionVectorMode.FOUR_BYTE, popConfig.getSVMode());
+    assertEquals(CoreOperatorType.EXTERNAL_SORT_VALUE, popConfig.getOperatorType());
+    assertEquals(ExternalSort.DEFAULT_SORT_ALLOCATION, popConfig.getInitialAllocation());
+    assertEquals(AbstractBase.MAX_ALLOCATION, popConfig.getMaxAllocation());
+    assertTrue(popConfig.isExecutable());
+
+    // Non-default settings
+
+    popConfig = new ExternalSort(null, Lists.newArrayList(ordering), true);
+    assertTrue(popConfig.getReverse());
+    long maxAlloc = 50_000_000;
+    popConfig.setMaxAllocation(maxAlloc);
+    assertEquals(ExternalSort.DEFAULT_SORT_ALLOCATION, popConfig.getInitialAllocation());
+    assertEquals(maxAlloc, popConfig.getMaxAllocation());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
new file mode 100644
index 0000000..6bff088
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
+import org.apache.drill.test.ConfigBuilder;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.OperatorFixture;
+import org.junit.Test;
+
+public class TestExternalSortInternals extends DrillTest {
+
+  private static final int ONE_MEG = 1024 * 1024;
+
+  /**
+   * Verify defaults configured in drill-override.conf.
+   */
+  @Test
+  public void testConfigDefaults() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    // Zero means no artificial limit
+    assertEquals(0, sortConfig.maxMemory());
+    // Zero mapped to large number
+    assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit());
+    // Default size: 256 MiB
+    assertEquals(256 * ONE_MEG, sortConfig.spillFileSize());
+    // Default size: 8 MiB
+    assertEquals(8 * ONE_MEG, sortConfig.spillBatchSize());
+    // Default size: 16 MiB
+    assertEquals(16 * ONE_MEG, sortConfig.mergeBatchSize());
+    // Default: unlimited
+    assertEquals(Integer.MAX_VALUE, sortConfig.getBufferedBatchLimit());
+  }
+
+  /**
+   * Verify that the various constants do, in fact, map to the
+   * expected properties, and that the properties are overridden.
+   */
+  @Test
+  public void testConfigOverride() {
+    // Verify the various HOCON ways of setting memory
+    DrillConfig drillConfig = new ConfigBuilder()
+        .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K")
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10)
+        .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, "10M")
+        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000)
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000)
+        .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50)
+        .build();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    assertEquals(2000 * 1024, sortConfig.maxMemory());
+    assertEquals(10, sortConfig.mergeLimit());
+    assertEquals(10 * ONE_MEG, sortConfig.spillFileSize());
+    assertEquals(500_000, sortConfig.spillBatchSize());
+    assertEquals(600_000, sortConfig.mergeBatchSize());
+    assertEquals(50, sortConfig.getBufferedBatchLimit());
+  }
+
+  /**
+   * Some properties have hard-coded limits. Verify these limits.
+   */
+  @Test
+  public void testConfigLimits() {
+    DrillConfig drillConfig = new ConfigBuilder()
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, SortConfig.MIN_MERGE_LIMIT - 1)
+        .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, SortConfig.MIN_SPILL_FILE_SIZE - 1)
+        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE - 1)
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1)
+        .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1)
+        .build();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit());
+    assertEquals(SortConfig.MIN_SPILL_FILE_SIZE, sortConfig.spillFileSize());
+    assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize());
+    assertEquals(SortConfig.MIN_MERGE_BATCH_SIZE, sortConfig.mergeBatchSize());
+    assertEquals(2, sortConfig.getBufferedBatchLimit());
+  }
+
+  @Test
+  public void testMemoryManagerBasics() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 50 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Basic setup
+
+    assertEquals(sortConfig.spillBatchSize(), memManager.getPreferredSpillBatchSize());
+    assertEquals(sortConfig.mergeBatchSize(), memManager.getPreferredMergeBatchSize());
+    assertEquals(memoryLimit, memManager.getMemoryLimit());
+
+    // Nice simple batch: 6 MB in size, 300 byte rows, vectors half full
+    // so 10000 rows. Sizes chosen so that spill and merge batch record
+    // stay below the limit of 64K.
+
+    int rowWidth = 300;
+    int rowCount = 10000;
+    int batchSize = rowWidth * rowCount * 2;
+
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+    // Zero rows - no update
+
+    memManager.updateEstimates(batchSize, rowWidth, 0);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+
+    // Larger batch size, update batch size
+
+    rowCount = 20000;
+    batchSize = rowWidth * rowCount * 2;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+    // Smaller batch size: no change
+
+    rowCount = 5000;
+    int lowBatchSize = rowWidth * rowCount * 2;
+    memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+
+    // Different batch density, update batch size
+
+    rowCount = 10000;
+    batchSize = rowWidth * rowCount * 5;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+    // Smaller row size, no update
+
+    int lowRowWidth = 200;
+    rowCount = 10000;
+    lowBatchSize = rowWidth * rowCount * 2;
+    memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+
+    // Larger row size, updates calcs
+
+    rowWidth = 400;
+    rowCount = 10000;
+    lowBatchSize = rowWidth * rowCount * 2;
+    memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
+
+    // EOF: very low density
+
+    memManager.updateEstimates(lowBatchSize, rowWidth, 5);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+  }
+
+  private void verifyCalcs(SortConfig sortConfig, long memoryLimit, SortMemoryManager memManager, int batchSize,
+      int rowWidth, int rowCount) {
+
+    assertFalse(memManager.mayOverflow());
+
+    // Row and batch sizes should be exact
+
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+
+    // Spill sizes will be rounded, but within reason.
+
+    int count = sortConfig.spillBatchSize() / rowWidth;
+    assertTrue(count >= memManager.getSpillBatchRowCount());
+    assertTrue(count/2 <= memManager.getSpillBatchRowCount());
+    int spillSize = memManager.getSpillBatchRowCount() * rowWidth;
+    assertTrue(spillSize <= memManager.getSpillBatchSize());
+    assertTrue(spillSize >= memManager.getSpillBatchSize()/2);
+    assertEquals(memoryLimit - memManager.getSpillBatchSize(), memManager.getBufferMemoryLimit());
+
+    // Merge sizes will also be rounded, within reason.
+
+    count = sortConfig.mergeBatchSize() / rowWidth;
+    assertTrue(count >= memManager.getMergeBatchRowCount());
+    assertTrue(count/2 <= memManager.getMergeBatchRowCount());
+    int mergeSize = memManager.getMergeBatchRowCount() * rowWidth;
+    assertTrue(mergeSize <= memManager.getMergeBatchSize());
+    assertTrue(mergeSize >= memManager.getMergeBatchSize()/2);
+    assertEquals(memoryLimit - memManager.getMergeBatchSize(), memManager.getMergeMemoryLimit());
+  }
+
+  @Test
+  public void testSmallRows() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 100 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Zero-length row, round to 10
+
+    int rowWidth = 0;
+    int rowCount = 10000;
+    int batchSize = rowCount * 2;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertEquals(10, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+
+    // Truncate spill, merge batch row count
+
+    assertEquals(Character.MAX_VALUE, memManager.getSpillBatchRowCount());
+    assertEquals(Character.MAX_VALUE, memManager.getMergeBatchRowCount());
+
+    // But leave batch sizes at their defaults
+
+    assertEquals(sortConfig.spillBatchSize(), memManager.getPreferredSpillBatchSize());
+    assertEquals(sortConfig.mergeBatchSize(), memManager.getPreferredMergeBatchSize());
+
+    // Small, but non-zero, row
+
+    rowWidth = 20;
+    rowCount = 10000;
+    batchSize = rowWidth * rowCount;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+
+    // Truncate spill, merge batch row count
+
+    assertEquals(Character.MAX_VALUE, memManager.getSpillBatchRowCount());
+    assertEquals(Character.MAX_VALUE, memManager.getMergeBatchRowCount());
+
+    // But leave batch sizes at their defaults
+
+    assertEquals(sortConfig.spillBatchSize(), memManager.getPreferredSpillBatchSize());
+    assertEquals(sortConfig.mergeBatchSize(), memManager.getPreferredMergeBatchSize());
+  }
+
+  @Test
+  public void testLowMemory() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 10 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Tight squeeze, but can be made to work.
+    // Input batches are a quarter of memory.
+
+    int rowWidth = 1000;
+    int rowCount = (int) (memoryLimit / 4 / rowWidth);
+    int batchSize = rowCount * rowWidth;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertFalse(memManager.mayOverflow());
+
+    // Spill, merge batches should be constrained
+
+    int spillBatchSize = memManager.getSpillBatchSize();
+    assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
+    assertTrue(spillBatchSize >= rowWidth);
+    assertTrue(spillBatchSize <= memoryLimit / 3);
+    assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
+    assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
+
+    int mergeBatchSize = memManager.getMergeBatchSize();
+    assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
+    assertTrue(mergeBatchSize >= rowWidth);
+    assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
+    assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
+
+    // Should spill after just two batches
+
+    assertFalse(memManager.isSpillNeeded(0, batchSize));
+    assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
+    assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+
+    // Tighter squeeze, but can be made to work.
+    // Input batches are 3/8 of memory; two fill 3/4,
+    // but small spill and merge batches allow progress.
+
+    rowWidth = 1000;
+    rowCount = (int) (memoryLimit * 3 / 8 / rowWidth);
+    batchSize = rowCount * rowWidth;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertFalse(memManager.mayOverflow());
+
+    // Spill, merge batches should be constrained
+
+    spillBatchSize = memManager.getSpillBatchSize();
+    assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
+    assertTrue(spillBatchSize >= rowWidth);
+    assertTrue(spillBatchSize <= memoryLimit / 3);
+    assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
+    assertTrue(memManager.getSpillBatchRowCount() > 1);
+    assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
+
+    mergeBatchSize = memManager.getMergeBatchSize();
+    assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
+    assertTrue(mergeBatchSize >= rowWidth);
+    assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
+    assertTrue(memManager.getMergeBatchRowCount() > 1);
+    assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
+  }
+
+  @Test
+  public void testExtremeLowMemory() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 10 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Jumbo row size, works with one row per batch. Minimum is to have two
+    // input rows and a spill row, or two spill rows and a merge row.
+    // Have to back off the exact size a bit to allow for internal fragmentation
+    // in the merge and output batches.
+
+    int rowWidth = (int) (memoryLimit / 3 * 0.75);
+    int rowCount = 1;
+    int batchSize = rowWidth;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertEquals(rowWidth, memManager.getRowWidth());
+    assertEquals(batchSize, memManager.getInputBatchSize());
+    assertFalse(memManager.mayOverflow());
+
+    int spillBatchSize = memManager.getSpillBatchSize();
+    assertTrue(spillBatchSize >= rowWidth);
+    assertTrue(spillBatchSize <= memoryLimit / 3);
+    assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
+    assertEquals(1, memManager.getSpillBatchRowCount());
+
+    int mergeBatchSize = memManager.getMergeBatchSize();
+    assertTrue(mergeBatchSize >= rowWidth);
+    assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
+    assertEquals(1, memManager.getMergeBatchRowCount());
+
+    // Should spill after just two rows
+
+    assertFalse(memManager.isSpillNeeded(0, batchSize));
+    assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
+    assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+
+    // In trouble now, can't fit even three rows.
+
+    rowWidth = (int) (memoryLimit / 2);
+    rowCount = 1;
+    batchSize = rowWidth;
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    assertTrue(memManager.mayOverflow());
+  }
+
+  @Test
+  public void testConfigConstraints() {
+    int memConstaint = 40 * ONE_MEG;
+    int batchSizeConstaint = ONE_MEG / 2;
+    int mergeSizeConstaint = ONE_MEG;
+    DrillConfig drillConfig = new ConfigBuilder()
+        .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstaint)
+        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstaint)
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, mergeSizeConstaint)
+        .build();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 50 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    assertEquals(batchSizeConstaint, memManager.getPreferredSpillBatchSize());
+    assertEquals(mergeSizeConstaint, memManager.getPreferredMergeBatchSize());
+    assertEquals(memConstaint, memManager.getMemoryLimit());
+
+    int rowWidth = 300;
+    int rowCount = 10000;
+    int batchSize = rowWidth * rowCount * 2;
+
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    verifyCalcs(sortConfig, memConstaint, memManager, batchSize, rowWidth, rowCount);
+  }
+
+  @Test
+  public void testMemoryDynamics() {
+    DrillConfig drillConfig = DrillConfig.create();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    long memoryLimit = 50 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    int rowWidth = 300;
+    int rowCount = 10000;
+    int batchSize = rowWidth * rowCount * 2;
+
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+
+    int spillBatchSize = memManager.getSpillBatchSize();
+
+    // Test various memory fill levels
+
+    assertFalse(memManager.isSpillNeeded(0, batchSize));
+    assertFalse(memManager.isSpillNeeded(2 * batchSize, batchSize));
+    assertTrue(memManager.isSpillNeeded(memoryLimit - spillBatchSize + 1, batchSize));
+
+    // Similar, but for an in-memory merge
+
+    assertTrue(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, ONE_MEG - 1));
+    assertTrue(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, ONE_MEG));
+    assertFalse(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, ONE_MEG + 1));
+  }
+
+  @Test
+  public void testMergeCalcs() {
+
+    // No artificial merge limit
+
+    int mergeLimitConstraint = 100;
+    DrillConfig drillConfig = new ConfigBuilder()
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
+        .build();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    // Allow four spill batches, 8 MB each, plus one output of 16
+    long memoryLimit = 50 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Prime the estimates
+
+    int rowWidth = 300;
+    int rowCount = 10000;
+    int batchSize = rowWidth * rowCount * 2;
+
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+    int spillBatchSize = memManager.getSpillBatchSize();
+    int mergeBatchSize = memManager.getMergeBatchSize();
+
+    // One in-mem batch, no merging.
+
+    long allocMemory = memoryLimit - mergeBatchSize;
+    MergeTask task = memManager.consolidateBatches(allocMemory, 1, 0);
+    assertEquals(MergeAction.NONE, task.action);
+
+    // Many in-mem batches, just enough to merge
+
+    allocMemory = memoryLimit - mergeBatchSize;
+    int memBatches = (int) (allocMemory / batchSize);
+    allocMemory = memBatches * batchSize;
+    task = memManager.consolidateBatches(allocMemory, memBatches, 0);
+    assertEquals(MergeAction.NONE, task.action);
+
+    // Spills if no room for spill and in-memory batches
+
+    task = memManager.consolidateBatches(allocMemory, memBatches, 1);
+    assertEquals(MergeAction.SPILL, task.action);
+
+    // One more in-mem batch: now needs to spill
+
+    memBatches++;
+    allocMemory = memBatches * batchSize;
+    task = memManager.consolidateBatches(allocMemory, memBatches, 0);
+    assertEquals(MergeAction.SPILL, task.action);
+
+    // No spill for various in-mem/spill run combinations
+
+    allocMemory = memoryLimit - spillBatchSize - mergeBatchSize;
+    memBatches = (int) (allocMemory / batchSize);
+    allocMemory = memBatches * batchSize;
+    task = memManager.consolidateBatches(allocMemory, memBatches, 1);
+    assertEquals(MergeAction.NONE, task.action);
+
+    allocMemory = memoryLimit - 2 * spillBatchSize - mergeBatchSize;
+    memBatches = (int) (allocMemory / batchSize);
+    allocMemory = memBatches * batchSize;
+    task = memManager.consolidateBatches(allocMemory, memBatches, 2);
+    assertEquals(MergeAction.NONE, task.action);
+
+    // No spill if no in-memory, only spill, and spill fits
+
+    long freeMem = memoryLimit - mergeBatchSize;
+    int spillBatches = (int) (freeMem / spillBatchSize);
+    task = memManager.consolidateBatches(0, 0, spillBatches);
+    assertEquals(MergeAction.NONE, task.action);
+
+    // One more and must merge
+
+    task = memManager.consolidateBatches(0, 0, spillBatches + 1);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertEquals(2, task.count);
+
+    // Two more and will merge more
+
+    task = memManager.consolidateBatches(0, 0, spillBatches + 2);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertEquals(3, task.count);
+  }
+
+  @Test
+  public void testMergeLimit() {
+    // Constrain merge width
+    int mergeLimitConstraint = 5;
+    DrillConfig drillConfig = new ConfigBuilder()
+        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
+        .build();
+    SortConfig sortConfig = new SortConfig(drillConfig);
+    // Plenty of memory, memory will not be a limit
+    long memoryLimit = 400 * ONE_MEG;
+    SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+    // Prime the estimates
+
+    int rowWidth = 300;
+    int rowCount = 10000;
+    int batchSize = rowWidth * rowCount * 2;
+
+    memManager.updateEstimates(batchSize, rowWidth, rowCount);
+
+    // Pretend merge limit runs, additional in-memory batches
+
+    int memBatchCount = 10;
+    int spillRunCount = mergeLimitConstraint;
+    long allocMemory = batchSize * memBatchCount;
+    MergeTask task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount);
+    assertEquals(MergeAction.NONE, task.action);
+
+    // One more run than can merge in one go. But, we have plenty of
+    // memory to merge and hold the in-memory batches. So, just merge.
+
+    task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount + 1);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertEquals(2, task.count);
+
+    // One more runs than can merge in one go, intermediate merge
+
+    task = memManager.consolidateBatches(0, 0, spillRunCount + 1);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertEquals(2, task.count);
+
+    // Two more spill runs, merge three
+
+    task = memManager.consolidateBatches(0, 0, spillRunCount + 2);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertEquals(3, task.count);
+
+    // Way more than can merge, limit to the constraint
+
+    task = memManager.consolidateBatches(0, 0, spillRunCount * 3);
+    assertEquals(MergeAction.MERGE, task.action);
+    assertEquals(mergeLimitConstraint, task.count);
+  }
+
+  @Test
+  public void testMetrics() {
+    OperatorFixture.MockStats stats = new OperatorFixture.MockStats();
+    SortMetrics metrics = new SortMetrics(stats);
+
+    // Input stats
+
+    metrics.updateInputMetrics(100, 10_000);
+    assertEquals(1, metrics.getInputBatchCount());
+    assertEquals(100, metrics.getInputRowCount());
+    assertEquals(10_000, metrics.getInputBytes());
+
+    metrics.updateInputMetrics(200, 20_000);
+    assertEquals(2, metrics.getInputBatchCount());
+    assertEquals(300, metrics.getInputRowCount());
+    assertEquals(30_000, metrics.getInputBytes());
+
+    // Buffer memory
+
+    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+    metrics.updateMemory(1_000_000);
+    assertEquals(1_000_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+    metrics.updateMemory(2_000_000);
+    assertEquals(1_000_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+    metrics.updateMemory(100_000);
+    assertEquals(100_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+
+    // Peak batches
+
+    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+    metrics.updatePeakBatches(10);
+    assertEquals(10D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+    metrics.updatePeakBatches(1);
+    assertEquals(10D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+    metrics.updatePeakBatches(20);
+    assertEquals(20D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+
+    // Merge count
+
+    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+
+    metrics.incrMergeCount();
+    assertEquals(1D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+
+    metrics.incrMergeCount();
+    assertEquals(2D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+
+    // Spill count
+
+    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+
+    metrics.incrSpillCount();
+    assertEquals(1D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+
+    metrics.incrSpillCount();
+    assertEquals(2D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+
+    // Write bytes
+
+    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.SPILL_MB), 0.01);
+
+    metrics.updateWriteBytes(17 * ONE_MEG + ONE_MEG * 3 / 4);
+    assertEquals(17.75D, stats.getStat(ExternalSortBatch.Metric.SPILL_MB), 0.001);
+  }
+}