You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/01/24 18:35:23 UTC

[09/11] drill git commit: DRILL-6049: Misc. hygiene and code cleanup changes

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
index c99f0a7..8b84995 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,66 +17,56 @@
  */
 package org.apache.drill.exec;
 
-import static org.junit.Assert.*;
-import io.netty.buffer.DrillBuf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.file.Paths;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
 
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetReader;
 import org.apache.drill.categories.PlannerTest;
 import org.apache.drill.categories.SlowTest;
-import org.apache.drill.common.DrillAutoCloseables;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.client.PrintingResultsListener;
-import org.apache.drill.exec.client.QuerySubmitter.Format;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.QueryData;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.ConnectionThrottle;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.util.VectorUtil;
-import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.junit.experimental.categories.Category;
 
 /**
- * Class to test different planning use cases (separate form query execution)
+ * Class to test different planning use cases (separate from query execution)
  *
  */
 @Category({SlowTest.class, PlannerTest.class})
-public class DrillSeparatePlanningTest extends BaseTestQuery {
+public class DrillSeparatePlanningTest extends ClusterTest {
   @BeforeClass
   public static void setupFiles() {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "json"));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "csv"));
   }
 
-  @Test(timeout=60000)
+  @Before
+  public void testSetup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+        .clusterSize(2);
+    startCluster(builder);
+  }
+
+  @Test(timeout=60_000)
   public void testSingleFragmentQuery() throws Exception {
-    final String query = "SELECT * FROM cp.`employee.json` where  employee_id > 1 and  employee_id < 1000";
+    final String query = "SELECT * FROM cp.`employee.json` where employee_id > 1 and employee_id < 1000";
 
     QueryPlanFragments planFragments = getFragmentsHelper(query);
 
@@ -85,251 +75,134 @@ public class DrillSeparatePlanningTest extends BaseTestQuery {
     assertEquals(1, planFragments.getFragmentsCount());
     assertTrue(planFragments.getFragments(0).getLeafFragment());
 
-    getResultsHelper(planFragments);
+    QuerySummary summary = client.queryBuilder().plan(planFragments.getFragmentsList()).run();
+    assertEquals(997, summary.recordCount());
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=60_000)
   public void testMultiMinorFragmentSimpleQuery() throws Exception {
     final String query = "SELECT o_orderkey FROM dfs.`multilevel/json`";
 
     QueryPlanFragments planFragments = getFragmentsHelper(query);
 
     assertNotNull(planFragments);
-
     assertTrue((planFragments.getFragmentsCount() > 1));
 
-    for ( PlanFragment planFragment : planFragments.getFragmentsList()) {
+    for (PlanFragment planFragment : planFragments.getFragmentsList()) {
       assertTrue(planFragment.getLeafFragment());
     }
 
-    getResultsHelper(planFragments);
+    int rowCount = getResultsHelper(planFragments);
+    assertEquals(120, rowCount);
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=60_000)
   public void testMultiMinorFragmentComplexQuery() throws Exception {
     final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0";
 
     QueryPlanFragments planFragments = getFragmentsHelper(query);
 
     assertNotNull(planFragments);
-
     assertTrue((planFragments.getFragmentsCount() > 1));
 
     for ( PlanFragment planFragment : planFragments.getFragmentsList()) {
       assertTrue(planFragment.getLeafFragment());
     }
 
-    getResultsHelper(planFragments);
+    int rowCount = getResultsHelper(planFragments);
+    assertEquals(8, rowCount);
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=60_000)
   public void testPlanningNoSplit() throws Exception {
     final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0";
 
-    updateTestCluster(2, config);
-
-    List<QueryDataBatch> results = client.runQuery(QueryType.SQL, "alter session set `planner.slice_target`=1");
-    for(QueryDataBatch batch : results) {
-      batch.release();
-    }
-
-    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(QueryType.SQL, query, false);
-
-    final QueryPlanFragments planFragments = queryFragmentsFutures.get();
-
-    assertNotNull(planFragments);
+    client.alterSession("planner.slice_target", 1);
+    try {
+      final QueryPlanFragments planFragments = client.planQuery(query);
 
-    assertTrue((planFragments.getFragmentsCount() > 1));
+      assertNotNull(planFragments);
+      assertTrue((planFragments.getFragmentsCount() > 1));
 
-    PlanFragment rootFragment = planFragments.getFragments(0);
-    assertFalse(rootFragment.getLeafFragment());
+      PlanFragment rootFragment = planFragments.getFragments(0);
+      assertFalse(rootFragment.getLeafFragment());
 
-    getCombinedResultsHelper(planFragments);
+      QuerySummary summary = client.queryBuilder().plan(planFragments.getFragmentsList()).run();
+      assertEquals(3, summary.recordCount());
+    }
+    finally {
+      client.resetSession("planner.slice_target");
+    }
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=60_000)
   public void testPlanningNegative() throws Exception {
     final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0";
 
-    updateTestCluster(2, config);
     // LOGICAL is not supported
-    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(QueryType.LOGICAL, query, false);
-
-    final QueryPlanFragments planFragments = queryFragmentsFutures.get();
+    final QueryPlanFragments planFragments = client.planQuery(QueryType.LOGICAL, query, false);
 
     assertNotNull(planFragments);
-
     assertNotNull(planFragments.getError());
-
     assertTrue(planFragments.getFragmentsCount()==0);
-
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=60_000)
   public void testPlanning() throws Exception {
     final String query = "SELECT dir0, columns[3] FROM dfs.`multilevel/csv` order by dir0";
 
-    updateTestCluster(2, config);
-
-    List<QueryDataBatch> results = client.runQuery(QueryType.SQL, "alter session set `planner.slice_target`=1");
-    for(QueryDataBatch batch : results) {
-      batch.release();
+    client.alterSession("planner.slice_target", 1);
+    try {
+      // Original version, but no reason to dump output to test results.
+//      long rows = client.queryBuilder().sql(query).print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
+      QuerySummary summary = client.queryBuilder().sql(query).run();
+      assertEquals(120, summary.recordCount());
     }
-    AwaitableUserResultsListener listener =
-        new AwaitableUserResultsListener(new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH));
-    client.runQuery(QueryType.SQL, query, listener);
-    @SuppressWarnings("unused")
-    int rows = listener.await();
-  }
-
-  private QueryPlanFragments getFragmentsHelper(final String query) throws InterruptedException, ExecutionException, RpcException {
-    updateTestCluster(2, config);
-
-    List<QueryDataBatch> results = client.runQuery(QueryType.SQL, "alter session set `planner.slice_target`=1");
-    for(QueryDataBatch batch : results) {
-      batch.release();
+    finally {
+      client.resetSession("planner.slice_target");
     }
+  }
 
-    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(QueryType.SQL, query, true);
+  private QueryPlanFragments getFragmentsHelper(final String query) {
+    client.alterSession("planner.slice_target", 1);
+    try {
+      QueryPlanFragments planFragments = client.planQuery(QueryType.SQL, query, true);
 
-    final QueryPlanFragments planFragments = queryFragmentsFutures.get();
+      // Uncomment for debugging.
 
-    for (PlanFragment fragment : planFragments.getFragmentsList()) {
-      System.out.println(fragment.getFragmentJson());
+//      for (PlanFragment fragment : planFragments.getFragmentsList()) {
+//        System.out.println(fragment.getFragmentJson());
+//      }
+      return planFragments;
+    }
+    finally {
+      client.resetSession("planner.slice_target");
     }
-
-    return planFragments;
   }
 
-  private void getResultsHelper(final QueryPlanFragments planFragments) throws Exception {
+  private int getResultsHelper(final QueryPlanFragments planFragments) throws Exception {
+    int totalRows = 0;
     for (PlanFragment fragment : planFragments.getFragmentsList()) {
       DrillbitEndpoint assignedNode = fragment.getAssignment();
-      @SuppressWarnings("resource")
-      DrillClient fragmentClient = new DrillClient(true);
-      Properties props = new Properties();
-      props.setProperty("drillbit", assignedNode.getAddress() + ":" + assignedNode.getUserPort());
-      fragmentClient.connect(props);
-
-      ShowResultsUserResultsListener myListener = new ShowResultsUserResultsListener(getAllocator());
-      AwaitableUserResultsListener listenerBits =
-          new AwaitableUserResultsListener(myListener);
-      fragmentClient.runQuery(QueryType.SQL, "select hostname, user_port from sys.drillbits where `current`=true",
-          listenerBits);
-      int row = listenerBits.await();
-      assertEquals(1, row);
-      List<Map<String,String>> records = myListener.getRecords();
-      assertEquals(1, records.size());
-      Map<String,String> record = records.get(0);
-      assertEquals(2, record.size());
-      Iterator<Entry<String, String>> iter = record.entrySet().iterator();
-      Entry<String, String> entry;
-      String host = null;
-      String port = null;
-      for (int i = 0; i < 2; i++) {
-       entry = iter.next();
-       if (entry.getKey().equalsIgnoreCase("hostname")) {
-          host = entry.getValue();
-        } else if (entry.getKey().equalsIgnoreCase("user_port")) {
-          port = entry.getValue();
-        } else {
-          fail("Unknown field: " + entry.getKey());
-        }
-       }
-      assertTrue(props.getProperty("drillbit").equalsIgnoreCase(host+":" + port));
+      ClientFixture fragmentClient = cluster.client(assignedNode.getAddress(), assignedNode.getUserPort());
+
+      RowSet rowSet = fragmentClient.queryBuilder().sql("select hostname, user_port from sys.drillbits where `current`=true").rowSet();
+      assertEquals(1, rowSet.rowCount());
+      RowSetReader reader = rowSet.reader();
+      assertTrue(reader.next());
+      String host = reader.scalar("hostname").getString();
+      int port = reader.scalar("user_port").getInt();
+      rowSet.clear();
+
+      assertEquals(assignedNode.getAddress(), host);
+      assertEquals(assignedNode.getUserPort(), port);
 
       List<PlanFragment> fragmentList = Lists.newArrayList();
       fragmentList.add(fragment);
-      AwaitableUserResultsListener listener =
-          new AwaitableUserResultsListener(new SilentListener());
-      fragmentClient.runQuery(QueryType.EXECUTION, fragmentList, listener);
-      @SuppressWarnings("unused")
-      int rows = listener.await();
+      QuerySummary summary = fragmentClient.queryBuilder().plan(fragmentList).run();
+      totalRows += summary.recordCount();
       fragmentClient.close();
     }
-  }
-
-  private void getCombinedResultsHelper(final QueryPlanFragments planFragments) throws Exception {
-      ShowResultsUserResultsListener myListener = new ShowResultsUserResultsListener(getAllocator());
-      @SuppressWarnings("unused")
-      AwaitableUserResultsListener listenerBits =
-          new AwaitableUserResultsListener(myListener);
-      AwaitableUserResultsListener listener =
-          new AwaitableUserResultsListener(new SilentListener());
-      client.runQuery(QueryType.EXECUTION, planFragments.getFragmentsList(), listener);
-      @SuppressWarnings("unused")
-      int rows = listener.await();
-  }
-
-  /**
-   * Helper class to get results
-   *
-   */
-  static class ShowResultsUserResultsListener implements UserResultsListener {
-
-    private QueryId queryId;
-    private final RecordBatchLoader loader;
-    private final BufferAllocator allocator;
-    private UserException ex;
-    private List<Map<String,String>> records = Lists.newArrayList();
-
-    public ShowResultsUserResultsListener(BufferAllocator allocator) {
-      this.loader = new RecordBatchLoader(allocator);
-      this.allocator = allocator;
-    }
-
-    public QueryId getQueryId() {
-      return queryId;
-    }
-
-    public List<Map<String, String>> getRecords() {
-      return records;
-    }
-
-    public UserException getEx() {
-      return ex;
-    }
-
-    @Override
-    public void queryIdArrived(QueryId queryId) {
-     this.queryId = queryId;
-    }
-
-    @Override
-    public void submissionFailed(UserException ex) {
-      DrillAutoCloseables.closeNoChecked(allocator);
-      this.ex = ex;
-    }
-
-    @Override
-    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      QueryData queryHeader = result.getHeader();
-      int rows = queryHeader.getRowCount();
-      try {
-        if ( result.hasData() ) {
-          DrillBuf data = result.getData();
-          loader.load(queryHeader.getDef(), data);
-          for (int i = 0; i < rows; i++) {
-             Map<String,String> record = Maps.newHashMap();
-            for (VectorWrapper<?> vw : loader) {
-              final String field = vw.getValueVector().getMetadata().getNamePart().getName();
-              final ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
-              final Object value = i < accessor.getValueCount() ? accessor.getObject(i) : null;
-              final String display = value == null ? null : value.toString();
-              record.put(field, display);
-            }
-            records.add(record);
-          }
-          loader.clear();
-        }
-        result.release();
-      } catch (SchemaChangeException e) {
-        fail(e.getMessage());
-      }
-
-    }
-
-    @Override
-    public void queryCompleted(QueryState state) {
-    }
+    return totalRows;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index 9ade940..e60533b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
 import org.apache.drill.exec.util.GuavaPatcher;
+import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +46,8 @@ import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
 import org.junit.After;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
+
 import java.io.IOException;
 import java.text.DateFormatSymbols;
 import java.util.Locale;
@@ -52,6 +55,9 @@ import java.util.Locale;
 
 public class ExecTest extends DrillTest {
 
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   protected static SystemOptionManager optionManager;
   static {
     GuavaPatcher.patch();

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestEvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestEvaluationVisitor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestEvaluationVisitor.java
index 3c41c81..91ce653 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestEvaluationVisitor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestEvaluationVisitor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
index 94a9f12..eaf5e02 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
@@ -156,7 +156,6 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
   protected void doTest(String expressionStr, String[] colNames, TypeProtos.MajorType[] colTypes, String[] expectFirstTwoValues, BitControl.PlanFragment planFragment) throws Exception {
     @SuppressWarnings("resource")
     final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-    @SuppressWarnings("resource")
     final Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
 
     bit1.run();
@@ -173,7 +172,6 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     final MockTableDef.MockScanEntry entry = new MockTableDef.MockScanEntry(10, false, 0, 1, columns);
     final MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", false, java.util.Collections.singletonList(entry));
 
-    @SuppressWarnings("resource")
     final ScanBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
 
     batch.next();
@@ -202,7 +200,7 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     try {
       final FragmentContext context =
           new FragmentContext(bit.getContext(), planFragment, null, bit.getContext().getFunctionImplementationRegistry());
-      return creator.getBatch(context,scanPOP, children);
+      return (ScanBatch) creator.getBatch(context, scanPOP, children);
     } catch (Exception ex) {
       throw new DrillRuntimeException("Error when setup fragment context" + ex);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
index e7d0a97..4860869 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
@@ -135,7 +135,7 @@ public class TopNBatchTest extends PopUnitTestBase {
         VectorContainer resultContainer = queue.getHyperBatch();
         resultContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
-        RowSet.HyperRowSet actualHyperSet = new HyperRowSetImpl(resultContainer, queue.getFinalSv4());
+        RowSet.HyperRowSet actualHyperSet = HyperRowSetImpl.fromContainer(resultContainer, queue.getFinalSv4());
         new RowSetComparison(expectedRowSet).verify(actualHyperSet);
       } finally {
         if (expectedRowSet != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index 7be6195..f517b1d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -29,6 +29,7 @@ import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.LogFixture;
 import org.apache.drill.test.ProfileParser;
 import org.apache.drill.test.QueryBuilder;
@@ -47,7 +48,7 @@ import static org.junit.Assert.assertTrue;
  * Test spilling for the Hash Aggr operator (using the mock reader)
  */
 @Category({SlowTest.class, OperatorTest.class})
-public class TestHashAggrSpill {
+public class TestHashAggrSpill extends DrillTest {
 
   @Rule
   public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 90183d9..7a66f43 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -271,7 +271,6 @@ public class TestWindowFrame extends BaseTestQuery {
       .run();
   }
 
-
   @Test
   public void testLag() throws Exception {
     testBuilder()

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
index 563d97e..a79ecf5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java
@@ -41,14 +41,26 @@ public class TestExternalSort extends BaseTestQuery {
 
   @Test
   public void testNumericTypesManaged() throws Exception {
-    testNumericTypes( false );
+    testNumericTypes(false);
   }
 
   @Test
   public void testNumericTypesLegacy() throws Exception {
-    testNumericTypes( true );
+    testNumericTypes(true);
   }
 
+  /**
+   * Test union type support in sort using numeric types: BIGINT and FLOAT8
+   * Drill does not support union types fully. Sort was adapted to handle them.
+   * This test simply verifies that the sort handles these types, even though
+   * Drill does not.
+   *
+   * @param testLegacy
+   *          true to test the old (pre-1.11) sort, false to test the new (1.11
+   *          and later) sort
+   * @throws Exception
+   */
+
   private void testNumericTypes(boolean testLegacy) throws Exception {
     final int record_count = 10000;
     final String tableDirName = "numericTypes";
@@ -103,8 +115,9 @@ public class TestExternalSort extends BaseTestQuery {
 
   private String getOptions(boolean testLegacy) {
     String options = "alter session set `exec.enable_union_type` = true";
-    options += ";alter session set `" + ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName() + "` = " +
-        Boolean.toString(testLegacy);
+    options += ";alter session set `"
+        + ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION.getOptionName()
+        + "` = " + Boolean.toString(testLegacy);
     return options;
   }
 
@@ -159,10 +172,10 @@ public class TestExternalSort extends BaseTestQuery {
     }
 
     TestBuilder builder = testBuilder()
-            .sqlQuery("select * from dfs.`%s` order by a desc", tableDirName)
-            .ordered()
-            .optionSettingQueriesForTestQuery(getOptions(testLegacy))
-            .baselineColumns("a");
+        .sqlQuery("select * from dfs.`%s` order by a desc", tableDirName)
+        .ordered()
+        .optionSettingQueriesForTestQuery(getOptions(testLegacy))
+        .baselineColumns("a");
     // Strings come first because order by is desc
     for (int i = record_count; i >= 0;) {
       i--;
@@ -225,12 +238,13 @@ public class TestExternalSort extends BaseTestQuery {
       rowSet.clear();
     }
 
-    // Test framework currently doesn't handle changing schema (i.e. new columns) on the client side
+    // Test framework currently doesn't handle changing schema (i.e. new
+    // columns) on the client side
     TestBuilder builder = testBuilder()
-            .sqlQuery("select a, b, c from dfs.`%s` order by a desc", tableDirName)
-            .ordered()
-            .optionSettingQueriesForTestQuery(getOptions(testLegacy))
-            .baselineColumns("a", "b", "c");
+        .sqlQuery("select a, b, c from dfs.`%s` order by a desc", tableDirName)
+        .ordered()
+        .optionSettingQueriesForTestQuery(getOptions(testLegacy))
+        .baselineColumns("a", "b", "c");
     for (int i = record_count; i >= 0;) {
       builder.baselineValues((long) i, (long) i--, null);
       if (i >= 0) {
@@ -238,6 +252,9 @@ public class TestExternalSort extends BaseTestQuery {
       }
     }
     builder.go();
+
+    // TODO: Useless test: just dumps to console
+
     test("select * from dfs.`%s` order by a desc", tableDirName);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index 04a1df8..2cd1793 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.categories.SlowTest;

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
index cd408cb..e106171 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -32,8 +32,8 @@ import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.TupleMetadata;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
@@ -63,12 +63,10 @@ public class SortTestUtilities {
   }
 
   @SuppressWarnings("resource")
-  public static PriorityQueueCopierWrapper makeCopier(OperatorFixture fixture, String sortOrder, String nullOrder) {
+  public static Sort makeCopierConfig(String sortOrder, String nullOrder) {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
     Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
-    Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
-    OperatorContext opContext = fixture.operatorContext(popConfig);
-    return new PriorityQueueCopierWrapper(opContext);
+    return new Sort(null, Lists.newArrayList(ordering), false);
   }
 
   public static class CopierTester {
@@ -91,24 +89,30 @@ public class SortTestUtilities {
     }
 
     public void run() throws Exception {
-      PriorityQueueCopierWrapper copier = makeCopier(fixture, sortOrder, nullOrder);
-      List<BatchGroup> batches = new ArrayList<>();
-      TupleMetadata schema = null;
-      for (SingleRowSet rowSet : rowSets) {
-        batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(),
-                    fixture.allocator(), rowSet.size()));
-        if (schema == null) {
-          schema = rowSet.schema();
+      Sort popConfig = SortTestUtilities.makeCopierConfig(sortOrder, nullOrder);
+      OperatorContext opContext = fixture.newOperatorContext(popConfig);
+      PriorityQueueCopierWrapper copier = new PriorityQueueCopierWrapper(opContext);
+      try {
+        List<BatchGroup> batches = new ArrayList<>();
+        TupleMetadata schema = null;
+        for (SingleRowSet rowSet : rowSets) {
+          batches.add(new BatchGroup.InputBatch(rowSet.container(), rowSet.getSv2(),
+                      fixture.allocator(), rowSet.size()));
+          if (schema == null) {
+            schema = rowSet.schema();
+          }
         }
+        int rowCount = outputRowCount();
+        VectorContainer dest = new VectorContainer();
+        BatchMerger merger = copier.startMerge(new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList()),
+                                               batches, dest, rowCount, null);
+
+        verifyResults(merger, dest);
+        dest.clear();
+        merger.close();
+      } finally {
+        opContext.close();
       }
-      int rowCount = outputRowCount();
-      VectorContainer dest = new VectorContainer();
-      BatchMerger merger = copier.startMerge(new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList()),
-                                             batches, dest, rowCount, null);
-
-      verifyResults(merger, dest);
-      dest.clear();
-      merger.close();
     }
 
     public int outputRowCount() {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
index 5d438ee..66481a7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
@@ -25,6 +25,8 @@ import java.util.List;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester;
 import org.apache.drill.exec.record.BatchSchema;
@@ -55,7 +57,9 @@ public class TestCopier extends SubOperatorTest {
   public void testEmptyInput() throws Exception {
     BatchSchema schema = SortTestUtilities.nonNullSchema();
     List<BatchGroup> batches = new ArrayList<>();
-    PriorityQueueCopierWrapper copier = SortTestUtilities.makeCopier(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
+    Sort popConfig = SortTestUtilities.makeCopierConfig(Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
+    OperatorContext opContext = fixture.newOperatorContext(popConfig);
+    PriorityQueueCopierWrapper copier = new PriorityQueueCopierWrapper(opContext);
     VectorContainer dest = new VectorContainer();
     try {
       // TODO: Create a vector allocator to pass as last parameter so
@@ -63,11 +67,13 @@ public class TestCopier extends SubOperatorTest {
       // code. Only nuisance is that we don't have the required metadata
       // readily at hand here...
 
-      @SuppressWarnings({ "resource", "unused" })
+      @SuppressWarnings({"resource", "unused"})
       BatchMerger merger = copier.startMerge(schema, batches, dest, 10, null);
       fail();
     } catch (AssertionError e) {
       // Expected
+    } finally {
+      opContext.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index e913c39..1315a86 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -24,16 +24,16 @@ import static org.junit.Assert.assertTrue;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
 import org.apache.drill.test.ConfigBuilder;
-import org.apache.drill.test.DrillTest;
-import org.apache.drill.test.OperatorFixture;
+import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(OperatorTest.class)
-public class TestExternalSortInternals extends DrillTest {
+public class TestExternalSortInternals extends SubOperatorTest {
 
   private static final int ONE_MEG = 1024 * 1024;
 
@@ -650,7 +650,7 @@ public class TestExternalSortInternals extends DrillTest {
 
   @Test
   public void testMetrics() {
-    OperatorFixture.MockStats stats = new OperatorFixture.MockStats();
+    OperatorStats stats = new OperatorStats(100, 101, 0, fixture.allocator());
     SortMetrics metrics = new SortMetrics(stats);
 
     // Input stats
@@ -667,55 +667,55 @@ public class TestExternalSortInternals extends DrillTest {
 
     // Buffer memory
 
-    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+    assertEquals(0L, stats.getLongStat(ExternalSortBatch.Metric.MIN_BUFFER));
 
     metrics.updateMemory(1_000_000);
-    assertEquals(1_000_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+    assertEquals(1_000_000L, stats.getLongStat(ExternalSortBatch.Metric.MIN_BUFFER));
 
     metrics.updateMemory(2_000_000);
-    assertEquals(1_000_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+    assertEquals(1_000_000L, stats.getLongStat(ExternalSortBatch.Metric.MIN_BUFFER));
 
     metrics.updateMemory(100_000);
-    assertEquals(100_000D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
+    assertEquals(100_000L, stats.getLongStat(ExternalSortBatch.Metric.MIN_BUFFER));
 
     // Peak batches
 
-    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+    assertEquals(0L, stats.getLongStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY));
 
     metrics.updatePeakBatches(10);
-    assertEquals(10D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+    assertEquals(10L, stats.getLongStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY));
 
     metrics.updatePeakBatches(1);
-    assertEquals(10D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+    assertEquals(10L, stats.getLongStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY));
 
     metrics.updatePeakBatches(20);
-    assertEquals(20D, stats.getStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY), 0.01);
+    assertEquals(20L, stats.getLongStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY));
 
     // Merge count
 
-    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+    assertEquals(0L, stats.getLongStat(ExternalSortBatch.Metric.MERGE_COUNT));
 
     metrics.incrMergeCount();
-    assertEquals(1D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+    assertEquals(1L, stats.getLongStat(ExternalSortBatch.Metric.MERGE_COUNT));
 
     metrics.incrMergeCount();
-    assertEquals(2D, stats.getStat(ExternalSortBatch.Metric.MERGE_COUNT), 0.01);
+    assertEquals(2L, stats.getLongStat(ExternalSortBatch.Metric.MERGE_COUNT));
 
     // Spill count
 
-    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+    assertEquals(0L, stats.getLongStat(ExternalSortBatch.Metric.SPILL_COUNT));
 
     metrics.incrSpillCount();
-    assertEquals(1D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+    assertEquals(1L, stats.getLongStat(ExternalSortBatch.Metric.SPILL_COUNT));
 
     metrics.incrSpillCount();
-    assertEquals(2D, stats.getStat(ExternalSortBatch.Metric.SPILL_COUNT), 0.01);
+    assertEquals(2L, stats.getLongStat(ExternalSortBatch.Metric.SPILL_COUNT));
 
     // Write bytes
 
-    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.SPILL_MB), 0.01);
+    assertEquals(0L, stats.getLongStat(ExternalSortBatch.Metric.SPILL_MB));
 
     metrics.updateWriteBytes(17 * ONE_MEG + ONE_MEG * 3 / 4);
-    assertEquals(17.75D, stats.getStat(ExternalSortBatch.Metric.SPILL_MB), 0.001);
+    assertEquals(17.75D, stats.getDoubleStat(ExternalSortBatch.Metric.SPILL_MB), 0.01);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index 93411d7..7c3c4cf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -82,7 +82,7 @@ public class TestSortImpl extends DrillTest {
     FieldReference expr = FieldReference.getWithQuotedRef("key");
     Ordering ordering = new Ordering(sortOrder, expr, nullOrder);
     Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false);
-    OperatorContext opContext = fixture.operatorContext(popConfig);
+    OperatorContext opContext = fixture.newOperatorContext(popConfig);
     QueryId queryId = QueryId.newBuilder()
         .setPart1(1234)
         .setPart2(5678)
@@ -157,7 +157,7 @@ public class TestSortImpl extends DrillTest {
       }
       for (RowSet expectedSet : expected) {
         assertTrue(results.next());
-        RowSet rowSet = toRowSet(fixture, results, dest);
+        RowSet rowSet = toRowSet(results, dest);
         // Uncomment these for debugging. Leave them commented otherwise
         // to avoid polluting the Maven build output unnecessarily.
 //        System.out.println("Expected:");
@@ -173,6 +173,11 @@ public class TestSortImpl extends DrillTest {
       results.close();
       dest.clear();
       sort.close();
+
+      // Note: context closed separately because this is normally done by
+      // the external sort itself after closing the output container.
+
+      sort.opContext().close();
       validateFinalStats(sort);
     }
 
@@ -191,9 +196,9 @@ public class TestSortImpl extends DrillTest {
    * @return
    */
 
-  private static RowSet toRowSet(OperatorFixture fixture, SortResults results, VectorContainer dest) {
+  private static RowSet toRowSet(SortResults results, VectorContainer dest) {
     if (results.getSv4() != null) {
-      return new HyperRowSetImpl(dest, results.getSv4());
+      return HyperRowSetImpl.fromContainer(dest, results.getSv4());
     } else if (results.getSv2() != null) {
       return IndirectRowSet.fromSv2(dest, results.getSv2());
     } else {
@@ -447,7 +452,7 @@ public class TestSortImpl extends DrillTest {
     }
     while (results.next()) {
       timer.stop();
-      RowSet output = toRowSet(fixture, results, dest);
+      RowSet output = toRowSet(results, dest);
       validator.validate(output);
       timer.start();
     }
@@ -456,6 +461,7 @@ public class TestSortImpl extends DrillTest {
     results.close();
     dest.clear();
     sort.close();
+    sort.opContext().close();
   }
 
   /**
@@ -544,6 +550,7 @@ public class TestSortImpl extends DrillTest {
     results.close();
     dest.clear();
     sort.close();
+    sort.opContext().close();
     System.out.println(timer.elapsed(TimeUnit.MILLISECONDS));
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
index c24f1a6..d4cce28 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java
@@ -82,14 +82,18 @@ public class TestSorter extends DrillTest {
   }
 
   public void runSorterTest(Sort popConfig, SingleRowSet rowSet, SingleRowSet expected) throws Exception {
-    OperatorContext opContext = fixture.operatorContext(popConfig);
+    OperatorContext opContext = fixture.newOperatorContext(popConfig);
     SorterWrapper sorter = new SorterWrapper(opContext);
 
-    sorter.sortBatch(rowSet.container(), rowSet.getSv2());
+    try {
+      sorter.sortBatch(rowSet.container(), rowSet.getSv2());
 
-    new RowSetComparison(expected)
-        .verifyAndClearAll(rowSet);
-    sorter.close();
+      new RowSetComparison(expected)
+          .verifyAndClearAll(rowSet);
+      sorter.close();
+    } finally {
+      opContext.close();
+    }
   }
 
   // Test degenerate case: no rows
@@ -143,15 +147,20 @@ public class TestSorter extends DrillTest {
     protected final OperatorFixture fixture;
     protected final SorterWrapper sorter;
     protected final boolean nullable;
+    protected final OperatorContext opContext;
 
     public BaseSortTester(OperatorFixture fixture, String sortOrder, String nullOrder, boolean nullable) {
       this.fixture = fixture;
       Sort popConfig = makeSortConfig("key", sortOrder, nullOrder);
       this.nullable = nullable;
 
-      OperatorContext opContext = fixture.operatorContext(popConfig);
+      opContext = fixture.newOperatorContext(popConfig);
       sorter = new SorterWrapper(opContext);
     }
+
+    public void close() {
+      opContext.close();
+    }
   }
 
   private abstract static class SortTester extends BaseSortTester {
@@ -474,33 +483,41 @@ public class TestSorter extends DrillTest {
 
   @Test
   public void testNumericTypes() throws Exception {
-    TestSorterNumeric tester1 = new TestSorterNumeric(fixture, true);
+    TestSorterNumeric tester = new TestSorterNumeric(fixture, true);
+    try {
 //      tester1.test(MinorType.TINYINT); // DRILL-5329
 //      tester1.test(MinorType.UINT1); DRILL-5329
 //      tester1.test(MinorType.SMALLINT); DRILL-5329
 //      tester1.test(MinorType.UINT2); DRILL-5329
-    tester1.test(MinorType.INT);
+      tester.test(MinorType.INT);
 //      tester1.test(MinorType.UINT4); DRILL-5329
-    tester1.test(MinorType.BIGINT);
+      tester.test(MinorType.BIGINT);
 //      tester1.test(MinorType.UINT8); DRILL-5329
-    tester1.test(MinorType.FLOAT4);
-    tester1.test(MinorType.FLOAT8);
-    tester1.test(MinorType.DECIMAL9);
-    tester1.test(MinorType.DECIMAL18);
+      tester.test(MinorType.FLOAT4);
+      tester.test(MinorType.FLOAT8);
+      tester.test(MinorType.DECIMAL9);
+      tester.test(MinorType.DECIMAL18);
 //      tester1.test(MinorType.DECIMAL28SPARSE); DRILL-5329
 //      tester1.test(MinorType.DECIMAL38SPARSE); DRILL-5329
 //    tester1.test(MinorType.DECIMAL28DENSE); No writer
 //    tester1.test(MinorType.DECIMAL38DENSE); No writer
-    tester1.test(MinorType.DATE);
-    tester1.test(MinorType.TIME);
-    tester1.test(MinorType.TIMESTAMP);
+      tester.test(MinorType.DATE);
+      tester.test(MinorType.TIME);
+      tester.test(MinorType.TIMESTAMP);
+    } finally {
+      tester.close();
+    }
   }
 
   @Test
   public void testVarCharTypes() throws Exception {
     TestSorterStringAsc tester = new TestSorterStringAsc(fixture);
-    tester.test(MinorType.VARCHAR);
+    try {
+      tester.test(MinorType.VARCHAR);
 //      tester.test(MinorType.VAR16CHAR); DRILL-5329
+    } finally {
+      tester.close();
+    }
   }
 
   /**
@@ -512,7 +529,11 @@ public class TestSorter extends DrillTest {
   @Test
   public void testVarBinary() throws Exception {
     TestSorterBinaryAsc tester = new TestSorterBinaryAsc(fixture);
-    tester.test(MinorType.VARBINARY);
+    try {
+      tester.test(MinorType.VARBINARY);
+    } finally {
+      tester.close();
+    }
   }
 
   /**
@@ -524,7 +545,11 @@ public class TestSorter extends DrillTest {
   @Test
   public void testInterval() throws Exception {
     TestSorterIntervalAsc tester = new TestSorterIntervalAsc(fixture);
-    tester.test();
+    try {
+      tester.test();
+    } finally {
+      tester.close();
+    }
   }
 
   /**
@@ -536,7 +561,11 @@ public class TestSorter extends DrillTest {
   @Test
   public void testIntervalYear() throws Exception {
     TestSorterIntervalYearAsc tester = new TestSorterIntervalYearAsc(fixture);
-    tester.test();
+    try {
+      tester.test();
+    } finally {
+      tester.close();
+    }
   }
 
   /**
@@ -548,13 +577,21 @@ public class TestSorter extends DrillTest {
   @Test
   public void testIntervalDay() throws Exception {
     TestSorterIntervalDayAsc tester = new TestSorterIntervalDayAsc(fixture);
-    tester.test();
+    try {
+      tester.test();
+    } finally {
+      tester.close();
+    }
   }
 
   @Test
   public void testDesc() throws Exception {
     TestSorterNumeric tester = new TestSorterNumeric(fixture, false);
-    tester.test(MinorType.INT);
+    try {
+      tester.test(MinorType.INT);
+    } finally {
+      tester.close();
+    }
   }
 
   /**
@@ -566,13 +603,29 @@ public class TestSorter extends DrillTest {
   @Test
   public void testNullable() throws Exception {
     TestSorterNullableNumeric tester = new TestSorterNullableNumeric(fixture, true, true);
-    tester.test(MinorType.INT);
+    try {
+      tester.test(MinorType.INT);
+    } finally {
+      tester.close();
+    }
     tester = new TestSorterNullableNumeric(fixture, true, false);
-    tester.test(MinorType.INT);
+    try {
+      tester.test(MinorType.INT);
+    } finally {
+      tester.close();
+    }
     tester = new TestSorterNullableNumeric(fixture, false, true);
-    tester.test(MinorType.INT);
+    try {
+      tester.test(MinorType.INT);
+    } finally {
+      tester.close();
+    }
     tester = new TestSorterNullableNumeric(fixture, false, false);
-    tester.test(MinorType.INT);
+    try {
+      tester.test(MinorType.INT);
+    } finally {
+      tester.close();
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index fda4442..088994f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -299,7 +299,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 //        optManager.getOption(withAny(new TypeValidators.PositiveLongValidator("", 1l, 1l))); result = 10;
         drillbitContext.getCompiler(); result = new CodeCompiler(drillConf, optionManager);
         fragContext.getOptions(); result = optionManager;
-        fragContext.getOptionSet(); result = optionManager;
+        fragContext.getOptions(); result = optionManager;
         fragContext.getManagedBuffer(); result = bufManager.getManagedBuffer();
         fragContext.shouldContinue(); result = true;
         fragContext.getExecutionControls(); result = executionControls;
@@ -342,7 +342,6 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     new NonStrictExpectations() {
       {
         opContext.getStats();result = opStats;
-        opContext.getStatsWriter(); result = opStats;
         opContext.getAllocator(); result = allocator;
         opContext.getFragmentContext(); result = fragContext;
         opContext.getOperatorDefn(); result = popConfig;

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
index 5ce8e3f..f0cc172 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java
@@ -29,16 +29,13 @@ import java.io.PrintWriter;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
-import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.SchemaBuilder;
-import org.apache.drill.test.DirTestWatcher;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Test;
 
 /**
@@ -52,9 +49,6 @@ public class TestCsv extends ClusterTest {
 
   private static File testDir;
 
-  @ClassRule
-  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
-
   @BeforeClass
   public static void setup() throws Exception {
     startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
index 47bb903..f7648d9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestHeaderBuilder.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.easy.text.compliant;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.drill.exec.store.easy.text.compliant.HeaderBuilder.HeaderError;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.test.DrillTest;
 import org.junit.Test;
 
@@ -34,7 +34,7 @@ public class TestHeaderBuilder extends DrillTest {
     hb.startBatch();
     try {
       hb.finishRecord();
-    } catch (HeaderError e) {
+    } catch (UserException e) {
       assertTrue(e.getMessage().contains("must define at least one header"));
     }
 
@@ -43,7 +43,7 @@ public class TestHeaderBuilder extends DrillTest {
     parse(hb,"");
     try {
       hb.finishRecord();
-    } catch (HeaderError e) {
+    } catch (UserException e) {
       assertTrue(e.getMessage().contains("must define at least one header"));
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index 802ce1b..387caa7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -65,8 +65,6 @@ import org.apache.drill.exec.util.StoragePluginTestUtils;
 import org.apache.drill.exec.util.VectorUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
-
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Resources;
@@ -108,9 +106,6 @@ public class BaseTestQuery extends ExecTest {
 
   private static ScanResult classpathScan;
 
-  @ClassRule
-  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
-
   @BeforeClass
   public static void setupDefaultTestCluster() throws Exception {
     config = DrillConfig.create(cloneDefaultTestConfigProperties());

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
index 12be961..3873740 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -26,11 +26,16 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
+import org.apache.drill.common.config.DrillProperties;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.testing.Controls;
@@ -96,6 +101,9 @@ public class ClientFixture implements AutoCloseable {
 
     if (cluster.usesZK()) {
       client = new DrillClient(cluster.config());
+    } else if (builder.clientProps != null  &&
+        builder.clientProps.containsKey(DrillProperties.DRILLBIT_CONNECTION)) {
+      client = new DrillClient(cluster.config(), cluster.serviceSet().getCoordinator(), true);
     } else {
       client = new DrillClient(cluster.config(), cluster.serviceSet().getCoordinator());
     }
@@ -191,6 +199,25 @@ public class ClientFixture implements AutoCloseable {
     }
   }
 
+  /**
+   * Plan a query without execution.
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+
+  public QueryPlanFragments planQuery(QueryType type, String query, boolean isSplitPlan) {
+    DrillRpcFuture<QueryPlanFragments> queryFragmentsFutures = client.planQuery(type, query, isSplitPlan);
+    try {
+      return queryFragmentsFutures.get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public QueryPlanFragments planQuery(String sql) {
+    return planQuery(QueryType.SQL, sql, false);
+  }
+
   @Override
   public void close() {
     if (client == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 6514ac8..8ee87c0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -320,6 +320,23 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
     return clients.get(0);
   }
 
+  /**
+   * Create a test client for a specific host and port.
+   *
+   * @param host host, must be one of those created by this
+   * fixture
+   * @param port post, must be one of those created by this
+   * fixture
+   * @return a test client. Client will be closed when this cluster
+   * fixture closes, or can be closed early
+   */
+
+  public ClientFixture client(String host, int port) {
+    return clientBuilder()
+      .property(DrillProperties.DRILLBIT_CONNECTION, String.format("%s:%d", host, port))
+      .build();
+  }
+
   public RestClientFixture restClientFixture() {
     if (restClientFixture == null) {
       restClientFixture = restClientBuilder().build();

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
index c85c591..1ae2a87 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -20,7 +20,9 @@ package org.apache.drill.test;
 import java.io.IOException;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.test.rowSet.RowSet;
 import org.junit.AfterClass;
+import org.junit.ClassRule;
 
 /**
  * Base class for tests that use a single cluster fixture for a set of
@@ -72,6 +74,9 @@ import org.junit.AfterClass;
 
 public class ClusterTest extends DrillTest {
 
+  @ClassRule
+  public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
   protected static ClusterFixture cluster;
   protected static ClientFixture client;
 
@@ -116,4 +121,34 @@ public class ClusterTest extends DrillTest {
   public QueryBuilder queryBuilder( ) {
     return client.queryBuilder();
   }
+
+  /**
+   * Handy development-time tool to run a query and print the results. Use this
+   * when first developing tests. Then, encode the expected results using
+   * the appropriate tool and verify them rather than just printing them to
+   * create the final test.
+   *
+   * @param sql the query to run
+   */
+
+  protected void runAndPrint(String sql) {
+    QueryResultSet results = client.queryBuilder().sql(sql).resultSet();
+    try {
+      for (;;) {
+        RowSet rowSet = results.next();
+        if (rowSet == null) {
+          break;
+        }
+        if (rowSet.rowCount() > 0) {
+          rowSet.print();
+        }
+        rowSet.clear();
+      }
+      System.out.println(results.recordCount());
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    } finally {
+      results.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 99bbacc..cd68bf3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -767,7 +767,7 @@ public class DrillTestWrapper {
           if (!expectedRecord.containsKey(s)) {
             throw new Exception("Unexpected column '" + s + "' returned by query.");
           }
-          if (!compareValues(expectedRecord.get(s), actualRecord.get(s), counter, s)) {
+          if (! compareValues(expectedRecord.get(s), actualRecord.get(s), counter, s)) {
             i++;
             continue findMatch;
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index a1b8af5..6135b1c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -18,9 +18,7 @@
 package org.apache.drill.test;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -28,6 +26,7 @@ import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.CodeCompiler;
+import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
@@ -36,9 +35,7 @@ import org.apache.drill.exec.ops.BaseOperatorContext;
 import org.apache.drill.exec.ops.BufferManager;
 import org.apache.drill.exec.ops.BufferManagerImpl;
 import org.apache.drill.exec.ops.FragmentContextInterface;
-import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorStatReceiver;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
@@ -46,9 +43,9 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.TupleMetadata;
 import org.apache.drill.exec.record.TupleSchema;
 import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.test.ClusterFixtureBuilder.RuntimeOption;
@@ -131,21 +128,23 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
   public static class TestFragmentContext extends BaseFragmentContext {
 
     private final DrillConfig config;
-    private final OptionSet options;
+    private final OptionManager options;
     private final CodeCompiler compiler;
     private ExecutionControls controls;
     private final BufferManagerImpl bufferManager;
+    private final BufferAllocator allocator;
 
-    public TestFragmentContext(DrillConfig config, OptionSet options, BufferAllocator allocator) {
+    public TestFragmentContext(DrillConfig config, OptionManager options, BufferAllocator allocator) {
       super(newFunctionRegistry(config, options));
       this.config = config;
       this.options = options;
+      this.allocator = allocator;
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
-        DrillConfig config, OptionSet options) {
+        DrillConfig config, OptionManager options) {
       ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
       return new FunctionImplementationRegistry(config, classpathScan, options);
     }
@@ -155,7 +154,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     }
 
     @Override
-    public OptionSet getOptionSet() {
+    public OptionManager getOptions() {
       return options;
     }
 
@@ -188,66 +187,33 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     protected BufferManager getBufferManager() {
       return bufferManager;
     }
-  }
-
-  /**
-   * Implements a write-only version of the stats collector for use by operators,
-   * then provides simplified test-time accessors to get the stats values when
-   * validating code in tests.
-   */
-
-  public static class MockStats implements OperatorStatReceiver {
-
-    public Map<Integer, Double> stats = new HashMap<>();
-
-    @Override
-    public void addLongStat(MetricDef metric, long value) {
-      setStat(metric, getStat(metric) + value);
-    }
 
+    @SuppressWarnings("resource")
     @Override
-    public void addDoubleStat(MetricDef metric, double value) {
-      setStat(metric, getStat(metric) + value);
+    public OperatorContext newOperatorContext(PhysicalOperator popConfig,
+        OperatorStats stats) throws OutOfMemoryException {
+      BufferAllocator childAllocator = allocator.newChildAllocator(
+          "test:" + popConfig.getClass().getSimpleName(),
+          popConfig.getInitialAllocation(),
+          popConfig.getMaxAllocation()
+          );
+      return new TestOperatorContext(this, childAllocator, popConfig);
     }
 
     @Override
-    public void setLongStat(MetricDef metric, long value) {
-      setStat(metric, value);
+    public OperatorContext newOperatorContext(PhysicalOperator popConfig)
+        throws OutOfMemoryException {
+      return newOperatorContext(popConfig, null);
     }
 
     @Override
-    public void setDoubleStat(MetricDef metric, double value) {
-      setStat(metric, value);
+    public String getQueryUserName() {
+      return "fred";
     }
-
-    public double getStat(MetricDef metric) {
-      return getStat(metric.metricId());
-    }
-
-    private double getStat(int metricId) {
-      Double value = stats.get(metricId);
-      return value == null ? 0 : value;
-    }
-
-    private void setStat(MetricDef metric, double value) {
-      setStat(metric.metricId(), value);
-    }
-
-    private void setStat(int metricId, double value) {
-      stats.put(metricId, value);
-    }
-
-    // Timing stats not supported for test.
-    @Override
-    public void startWait() { }
-
-    @Override
-    public void stopWait() {  }
   }
 
   private final SystemOptionManager options;
   private final TestFragmentContext context;
-  private final OperatorStatReceiver stats;
 
   protected OperatorFixture(OperatorFixtureBuilder builder) {
     config = builder.configBuilder().build();
@@ -262,7 +228,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       applySystemOptions(builder.systemOptions);
     }
     context = new TestFragmentContext(config, options, allocator);
-    stats = new MockStats();
    }
 
   private void applySystemOptions(List<RuntimeOption> systemOptions) {
@@ -272,7 +237,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
   }
 
   public SystemOptionManager options() { return options; }
-  public FragmentContextInterface fragmentExecContext() { return context; }
+  public FragmentContextInterface fragmentContext() { return context; }
 
   @Override
   public void close() throws Exception {
@@ -312,7 +277,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
   public RowSet wrap(VectorContainer container) {
     switch (container.getSchema().getSelectionVectorMode()) {
     case FOUR_BYTE:
-      return new HyperRowSetImpl(container, container.getSelectionVector4());
+      return HyperRowSetImpl.fromContainer(container, container.getSelectionVector4());
     case NONE:
       return DirectRowSet.fromContainer(container);
     case TWO_BYTE:
@@ -324,25 +289,17 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
 
   public static class TestOperatorContext extends BaseOperatorContext {
 
-    private final OperatorStatReceiver stats;
+    private final OperatorStats stats;
 
     public TestOperatorContext(FragmentContextInterface fragContext,
         BufferAllocator allocator,
-        PhysicalOperator config,
-        OperatorStatReceiver stats) {
+        PhysicalOperator config) {
       super(fragContext, allocator, config);
-      this.stats = stats;
-    }
-
-    @Override
-    public OperatorStatReceiver getStatsWriter() {
-      return stats;
+      stats = new OperatorStats(100, 101, 0, allocator);
     }
 
     @Override
-    public OperatorStats getStats() {
-      throw new UnsupportedOperationException("getStats() not supported for tests");
-    }
+    public OperatorStats getStats() { return stats; }
 
     @Override
     public <RESULT> ListenableFuture<RESULT> runCallableAs(
@@ -351,8 +308,14 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
     }
   }
 
-  public OperatorContext operatorContext(PhysicalOperator config) {
-    return new TestOperatorContext(context, allocator(), config, stats);
+  @SuppressWarnings("resource")
+  public OperatorContext newOperatorContext(PhysicalOperator popConfig) {
+    BufferAllocator childAllocator = allocator.newChildAllocator(
+        "test:" + popConfig.getClass().getSimpleName(),
+        popConfig.getInitialAllocation(),
+        popConfig.getMaxAllocation()
+        );
+    return new TestOperatorContext(context, childAllocator, popConfig);
   }
 
   public RowSet wrap(VectorContainer container, SelectionVector2 sv2) {

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 2d1aa9b..2f735d9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -36,6 +36,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.client.PrintingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
@@ -216,6 +217,7 @@ public class QueryBuilder {
   private final ClientFixture client;
   private QueryType queryType;
   private String queryText;
+  private List<PlanFragment> planFragments;
 
   QueryBuilder(ClientFixture client) {
     this.client = client;
@@ -236,6 +238,19 @@ public class QueryBuilder {
   }
 
   /**
+   * Run a physical plan presented as a list of fragments.
+   *
+   * @param planFragments fragments that make up the plan
+   * @return this builder
+   */
+
+  public QueryBuilder plan(List<PlanFragment> planFragments) {
+    queryType = QueryType.EXECUTION;
+    this.planFragments = planFragments;
+    return this;
+  }
+
+  /**
    * Parse a single SQL statement (with optional ending semi-colon) from
    * the file provided.
    * @param file the file containing exactly one SQL statement, with
@@ -258,6 +273,13 @@ public class QueryBuilder {
     return query(QueryType.PHYSICAL, plan);
   }
 
+  /**
+   * Run a query contained in a resource file.
+   *
+   * @param resource Name of the resource
+   * @return this builder
+   */
+
   public QueryBuilder sqlResource(String resource) {
     sql(ClusterFixture.loadResource(resource));
     return this;
@@ -300,13 +322,14 @@ public class QueryBuilder {
   }
 
   /**
-   * Run the query and return the first result set as a
+   * Run the query and return the first non-empty batch as a
    * {@link DirectRowSet} object that can be inspected directly
    * by the code using a {@link RowSetReader}.
    * <p>
-   * An enhancement is to provide a way to read a series of result
+   *
+   * @see {@link #rowSetIterator()} for a version that reads a series of
    * batches as row sets.
-   * @return a row set that represents the first batch returned from
+   * @return a row set that represents the first non-empty batch returned from
    * the query
    * @throws RpcException if anything goes wrong
    */
@@ -425,8 +448,16 @@ public class QueryBuilder {
 
   public void withListener(UserResultsListener listener) {
     Preconditions.checkNotNull(queryType, "Query not provided.");
-    Preconditions.checkNotNull(queryText, "Query not provided.");
-    client.client().runQuery(queryType, queryText, listener);
+    if (planFragments != null) {
+      try {
+        client.client().runQuery(QueryType.EXECUTION, planFragments, listener);
+      } catch(RpcException e) {
+        throw new IllegalStateException(e);
+      }
+    } else {
+      Preconditions.checkNotNull(queryText, "Query not provided.");
+      client.client().runQuery(queryType, queryText, listener);
+    }
   }
 
   /**
@@ -481,7 +512,6 @@ public class QueryBuilder {
   public long print() throws Exception {
     DrillConfig config = client.cluster().config( );
 
-
     boolean verbose = ! config.getBoolean(QueryTestUtil.TEST_QUERY_PRINTING_SILENT) ||
                       DrillTest.verbose();
     if (verbose) {
@@ -560,6 +590,11 @@ public class QueryBuilder {
     return new QuerySummary(queryId, recordCount, batchCount, elapsed, state);
   }
 
+  public QueryResultSet resultSet() {
+    BufferingQueryEventListener listener = withEventListener();
+    return new QueryResultSet(listener, client.allocator());
+  }
+
   /**
    * Submit an "EXPLAIN" statement, and return the column value which
    * contains the plan's string.

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
new file mode 100644
index 0000000..cf13e2b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+
+/**
+ * Returns query results as an iterator over row sets. Provides
+ * a very easy way for tests to work with query data using the
+ * row set tools.
+ */
+
+public class QueryResultSet {
+  private BufferingQueryEventListener listener;
+  private boolean eof;
+  private int recordCount = 0;
+  private int batchCount = 0;
+  private QueryId queryId = null;
+  @SuppressWarnings("unused")
+  private QueryState state = null;
+  final RecordBatchLoader loader;
+
+  public QueryResultSet(BufferingQueryEventListener listener, BufferAllocator allocator) {
+    this.listener = listener;
+    loader = new RecordBatchLoader(allocator);
+  }
+
+  /**
+   * Return the next batch of data as a row set. The first batch is usually
+   * empty as it carries only schema.
+   *
+   * @return the next batch as a row set, or null if EOF
+   * @throws Exception on a server error
+   */
+
+  public DirectRowSet next() throws Exception {
+    if (eof) {
+      return null;
+    }
+    for (;;) {
+      QueryEvent event = listener.get();
+      switch (event.type)
+      {
+      case BATCH:
+        batchCount++;
+        recordCount += event.batch.getHeader().getRowCount();
+        loader.load(event.batch.getHeader().getDef(), event.batch.getData());
+        event.batch.release();
+        return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
+
+      case EOF:
+        state = event.state;
+        eof = true;
+        return null;
+
+      case ERROR:
+        state = event.state;
+        eof = true;
+        throw event.error;
+
+      case QUERY_ID:
+        queryId = event.queryId;
+        continue;
+
+      default:
+        throw new IllegalStateException("Unexpected event: " + event.type);
+      }
+    }
+  }
+
+  public QueryId queryId() { return queryId; }
+  public int recordCount() { return recordCount; }
+  public int batchCount() { return batchCount; }
+
+  public void close() {
+    try {
+      while (! eof) {
+        RowSet rowSet = next();
+        if (rowSet != null) {
+          rowSet.clear();
+        }
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    } finally {
+      loader.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
index 8a3db9f..d0ca662 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/HyperRowSetImpl.java
@@ -61,6 +61,10 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
     this.sv4 = sv4;
   }
 
+  public static HyperRowSet fromContainer(VectorContainer container, SelectionVector4 sv4) {
+    return new HyperRowSetImpl(container, sv4);
+  }
+
   @Override
   public boolean isExtendable() { return false; }
 
@@ -80,4 +84,10 @@ public class HyperRowSetImpl extends AbstractRowSet implements HyperRowSet {
 
   @Override
   public int rowCount() { return sv4.getCount(); }
+
+  @Override
+  public void clear() {
+    super.clear();
+    sv4.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/e791ed62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index 10e9032..e84f2d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -30,13 +30,10 @@ import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.RepeatedIntVector;
 import org.apache.drill.exec.vector.accessor.ColumnAccessors.IntColumnWriter;
 import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter;
 import org.apache.drill.exec.vector.accessor.writer.ScalarArrayWriter;
 import org.apache.drill.test.OperatorFixture;
-import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.SchemaBuilder;
 
 import com.google.common.base.Stopwatch;
@@ -278,19 +275,4 @@ public class PerformanceTool {
       e.printStackTrace();
     }
   }
-
-  @SuppressWarnings("unused")
-  private static void testWriter2(TupleMetadata rowSchema,
-      OperatorFixture fixture, Stopwatch timer) {
-    ExtendableRowSet rs = fixture.rowSet(rowSchema);
-    RowSetWriter writer = rs.writer(4096);
-    ScalarWriter colWriter = writer.scalar(0);
-    timer.start();
-    for (int i = 0; i < ROW_COUNT; i++) {
-      colWriter.setInt(i);
-      writer.save();
-    }
-    timer.stop();
-    writer.done().clear();
-  }
 }