You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/01/30 19:42:40 UTC

[3/4] drill git commit: DRILL-5126: Provide simplified, unified "cluster fixture" for test

DRILL-5126: Provide simplified, unified "cluster fixture" for test

Drill provides a robust selection of test frameworks that have evolved to satisfy the needs of a variety of test cases.
However, some do some of what a given test needs, while others to other parts. Also, the various frameworks make
assumptions (in the form of boot-time configuration) that differs from what some test may need, forcing the test
to start, then stop, then restart a Drillbit - an expensive operation.

Also, many ways exist to run queries, but they all do part of the job. Several ways exist to channge
runtime options.

This checkin shamelessly grabs the best parts from existing frameworks, adds a fluent builder facade
and provides a complete, versitie test framework for new tests. Old tests are unaffected by this
new code.

An adjustment was made to allow use of the existing TestBuilder mechanism. TestBuilder used to
depend on static members of BaseTestQuery. A "shim" allows the same code to work in the old
way for old tests, but with the new ClusterFixture for new tests.

Details are in the org.apache.drill.test.package-info.java file.

This commit modifies a single test case, TestSimpleExternalSort, to use the new framework.
More cases will follow once this framework itself is committed.

Also, the framework will eventually allow use of the extended mock data source
from SQL. However, that change must await checkin of the mock data source changes.

Includes a LogFixture that allows setting logger options per test to simplify debugging via tests.

Also includes a \u201csummary listener\u201d to run a query and return a summary of the
run. Handy to simply verify that a query runs and to time it.

Added an async query runner for tests that want to run multiple
concurrent queries.

closes #710


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5c3924c9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5c3924c9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5c3924c9

Branch: refs/heads/master
Commit: 5c3924c9844f7d25c0798c649bc032a0022b3a3e
Parents: 837722c
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Dec 13 13:41:23 2016 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Mon Jan 30 10:09:39 2017 -0800

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   6 +
 .../org/apache/drill/exec/server/Drillbit.java  |   2 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  23 +-
 .../java/org/apache/drill/DrillTestWrapper.java |  52 ++-
 .../test/java/org/apache/drill/TestBuilder.java |  88 ++--
 .../java/org/apache/drill/exec/ExecTest.java    |   2 +-
 .../impl/xsort/TestSimpleExternalSort.java      | 256 ++++-------
 .../drill/test/BufferingQueryEventListener.java | 112 +++++
 .../org/apache/drill/test/ClientFixture.java    | 195 ++++++++
 .../org/apache/drill/test/ClusterFixture.java   | 432 ++++++++++++++++++
 .../java/org/apache/drill/test/ClusterTest.java | 122 +++++
 .../java/org/apache/drill/test/FieldDef.java    |  82 ++++
 .../org/apache/drill/test/FixtureBuilder.java   | 260 +++++++++++
 .../java/org/apache/drill/test/LogFixture.java  | 255 +++++++++++
 .../org/apache/drill/test/ProfileParser.java    | 219 +++++++++
 .../org/apache/drill/test/QueryBuilder.java     | 455 +++++++++++++++++++
 .../org/apache/drill/test/package-info.java     |  90 ++++
 17 files changed, 2435 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 5992acb..000d447 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -458,6 +458,12 @@
       <artifactId>httpdlog-parser</artifactId>
       <version>2.4</version>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish</groupId>
+      <artifactId>javax.json</artifactId>
+      <version>1.0.4</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 25776ad..77532e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -106,7 +106,7 @@ public class Drillbit implements AutoCloseable {
       storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
     } else {
       coord = new ZKClusterCoordinator(config);
-      storeProvider = new PersistentStoreRegistry<ClusterCoordinator>(this.coord, config).newPStoreProvider();
+      storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider();
       isDistributedMode = true;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index fb84088..42cfe08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.DrillTestWrapper.TestServices;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.scanner.ClassPathScanner;
@@ -249,8 +250,26 @@ public class BaseTestQuery extends ExecTest {
     return testBuilder();
   }
 
+
+  public static class ClassicTestServices implements TestServices {
+    @Override
+    public BufferAllocator allocator() {
+      return allocator;
+    }
+
+    @Override
+    public void test(String query) throws Exception {
+      BaseTestQuery.test(query);
+    }
+
+    @Override
+    public List<QueryDataBatch> testRunAndReturn(final QueryType type, final Object query) throws Exception {
+      return BaseTestQuery.testRunAndReturn(type, query);
+    }
+  }
+
   public static TestBuilder testBuilder() {
-    return new TestBuilder(allocator);
+    return new TestBuilder(new ClassicTestServices());
   }
 
   @AfterClass
@@ -308,7 +327,7 @@ public class BaseTestQuery extends ExecTest {
       Preconditions.checkArgument(query instanceof String, "Expected a string as input query");
       query = QueryTestUtil.normalizeQuery((String)query);
       return client.runQuery(type, (String)query);
-  }
+    }
   }
 
   public static List<QueryDataBatch> testPreparedStatement(PreparedStatementHandle handle) throws Exception {

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 7033be6..054676d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -64,6 +64,14 @@ import org.apache.drill.exec.vector.ValueVector;
 public class DrillTestWrapper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
+  public interface TestServices {
+    BufferAllocator allocator();
+
+    void test(String query) throws Exception;
+
+    List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception;
+  }
+
   // TODO - when in JSON, read baseline in all text mode to avoid precision loss for decimal values
 
   // This flag will enable all of the values that are validated to be logged. For large validations this is time consuming
@@ -91,7 +99,7 @@ public class DrillTestWrapper {
   private UserBitShared.QueryType baselineQueryType;
   // should ordering be enforced in the baseline check
   private boolean ordered;
-  private BufferAllocator allocator;
+  private TestServices services;
   // queries to run before the baseline or test queries, can be used to set options
   private String baselineOptionSettingQueries;
   private String testOptionSettingQueries;
@@ -108,12 +116,12 @@ public class DrillTestWrapper {
 
   private int expectedNumBatches;
 
-  public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, Object query, QueryType queryType,
+  public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
                           String baselineOptionSettingQueries, String testOptionSettingQueries,
                           QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
                           List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
     this.testBuilder = testBuilder;
-    this.allocator = allocator;
+    this.services = services;
     this.query = query;
     this.queryType = queryType;
     this.baselineQueryType = baselineQueryType;
@@ -138,7 +146,7 @@ public class DrillTestWrapper {
   }
 
   private BufferAllocator getAllocator() {
-    return allocator;
+    return services.allocator();
   }
 
   private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
@@ -388,8 +396,8 @@ public class DrillTestWrapper {
     List<QueryDataBatch> actual;
     QueryDataBatch batch = null;
     try {
-      BaseTestQuery.test(testOptionSettingQueries);
-      actual = BaseTestQuery.testRunAndReturn(queryType, query);
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
       batch = actual.get(0);
       loader.load(batch.getHeader().getDef(), batch.getData());
 
@@ -438,8 +446,8 @@ public class DrillTestWrapper {
     List<Map<String, Object>> actualRecords = new ArrayList<>();
 
     try {
-      BaseTestQuery.test(testOptionSettingQueries);
-      actual = BaseTestQuery.testRunAndReturn(queryType, query);
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
 
       checkNumBatches(actual);
 
@@ -449,8 +457,8 @@ public class DrillTestWrapper {
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
-        BaseTestQuery.test(baselineOptionSettingQueries);
-        expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+        test(baselineOptionSettingQueries);
+        expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
         addToMaterializedResults(expectedRecords, expected, loader);
       } else {
         expectedRecords = baselineRecords;
@@ -481,7 +489,6 @@ public class DrillTestWrapper {
 
   public void compareMergedOnHeapVectors() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    BatchSchema schema = null;
 
     List<QueryDataBatch> actual = Collections.emptyList();
     List<QueryDataBatch> expected = Collections.emptyList();
@@ -489,8 +496,8 @@ public class DrillTestWrapper {
     Map<String, List<Object>> expectedSuperVectors;
 
     try {
-      BaseTestQuery.test(testOptionSettingQueries);
-      actual = BaseTestQuery.testRunAndReturn(queryType, query);
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
 
       checkNumBatches(actual);
 
@@ -504,8 +511,8 @@ public class DrillTestWrapper {
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
-        BaseTestQuery.test(baselineOptionSettingQueries);
-        expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+        test(baselineOptionSettingQueries);
+        expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
         BatchIterator exBatchIter = new BatchIterator(expected, loader);
         expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
         exBatchIter.close();
@@ -539,8 +546,8 @@ public class DrillTestWrapper {
   public void compareResultsHyperVector() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
 
-    BaseTestQuery.test(testOptionSettingQueries);
-    List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
+    test(testOptionSettingQueries);
+    List<QueryDataBatch> results = testRunAndReturn(queryType, query);
 
     checkNumBatches(results);
 
@@ -549,8 +556,8 @@ public class DrillTestWrapper {
 
     Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader);
 
-    BaseTestQuery.test(baselineOptionSettingQueries);
-    List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+    test(baselineOptionSettingQueries);
+    List<QueryDataBatch> expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
 
     Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader);
 
@@ -761,4 +768,11 @@ public class DrillTestWrapper {
     return ret + "\n";
   }
 
+  private void test(String query) throws Exception {
+    services.test(query);
+  }
+
+  private List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception {
+    return services.testRunAndReturn(type, query);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index a19b30e..bef7b3b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -29,6 +29,7 @@ import org.antlr.runtime.ANTLRStringStream;
 import org.antlr.runtime.CommonTokenStream;
 import org.antlr.runtime.RecognitionException;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.DrillTestWrapper.TestServices;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.parser.ExprLexer;
 import org.apache.drill.common.expression.parser.ExprParser;
@@ -56,7 +57,7 @@ public class TestBuilder {
   // should the validation enforce ordering
   private Boolean ordered;
   private boolean approximateEquality;
-  private BufferAllocator allocator;
+  private TestServices services;
   // Used to pass the type information associated with particular column names rather than relying on the
   // ordering of the columns in the CSV file, or the default type inferences when reading JSON, this is used for the
   // case where results of the test query are adding type casts to the baseline queries, this saves a little bit of
@@ -84,16 +85,16 @@ public class TestBuilder {
 
   private int expectedNumBatches = DrillTestWrapper.EXPECTED_BATCH_COUNT_NOT_SET;
 
-  public TestBuilder(BufferAllocator allocator) {
-    this.allocator = allocator;
+  public TestBuilder(TestServices services) {
+    this.services = services;
     reset();
   }
 
-  public TestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
+  public TestBuilder(TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                      boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                      String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                      int expectedNumBatches) {
-    this(allocator);
+    this(services);
     if (ordered == null) {
       throw new RuntimeException("Ordering not set, when using a baseline file or query you must explicitly call the ordered() or unOrdered() method on the " + this.getClass().getSimpleName());
     }
@@ -123,7 +124,7 @@ public class TestBuilder {
     if ( ! ordered && highPerformanceComparison ) {
       throw new Exception("High performance comparison only available for ordered checks, to enforce this restriction, ordered() must be called first.");
     }
-    return new DrillTestWrapper(this, allocator, query, queryType, baselineOptionSettingQueries, testOptionSettingQueries,
+    return new DrillTestWrapper(this, services, query, queryType, baselineOptionSettingQueries, testOptionSettingQueries,
         getValidationQueryType(), ordered, highPerformanceComparison, baselineRecords, expectedNumBatches);
   }
 
@@ -154,24 +155,24 @@ public class TestBuilder {
   public TestBuilder sqlQueryFromFile(String queryFile) throws IOException {
     String query = BaseTestQuery.getFile(queryFile);
     this.query = query;
-    this.queryType = UserBitShared.QueryType.SQL;
+    queryType = UserBitShared.QueryType.SQL;
     return this;
   }
 
   public TestBuilder physicalPlanFromFile(String queryFile) throws IOException {
     String query = BaseTestQuery.getFile(queryFile);
     this.query = query;
-    this.queryType = UserBitShared.QueryType.PHYSICAL;
+    queryType = UserBitShared.QueryType.PHYSICAL;
     return this;
   }
 
   public TestBuilder ordered() {
-    this.ordered = true;
+    ordered = true;
     return this;
   }
 
   public TestBuilder unOrdered() {
-    this.ordered = false;
+    ordered = false;
     return this;
   }
 
@@ -179,36 +180,41 @@ public class TestBuilder {
   // a little harder to debug as it iterates over a hyper batch rather than reading all of the values into
   // large on-heap lists
   public TestBuilder highPerformanceComparison() throws Exception {
-    this.highPerformanceComparison = true;
+    highPerformanceComparison = true;
     return this;
   }
 
   // list of queries to run before the baseline query, can be used to set several options
   // list takes the form of a semi-colon separated list
   public TestBuilder optionSettingQueriesForBaseline(String queries) {
-    this.baselineOptionSettingQueries = queries;
+    baselineOptionSettingQueries = queries;
     return this;
   }
 
   public TestBuilder optionSettingQueriesForBaseline(String queries, Object... args) {
-    this.baselineOptionSettingQueries = String.format(queries, args);
+    baselineOptionSettingQueries = String.format(queries, args);
     return this;
   }
 
-  // list of queries to run before the test query, can be used to set several options
-  // list takes the form of a semi-colon separated list
+  /**
+   *  list of queries to run before the test query, can be used to set several options
+   *  list takes the form of a semi-colon separated list.
+   * @param queries queries that set session and system options
+   * @return this test builder
+   */
+
   public TestBuilder optionSettingQueriesForTestQuery(String queries) {
-    this.testOptionSettingQueries = queries;
+    testOptionSettingQueries = queries;
     return this;
   }
 
   public TestBuilder optionSettingQueriesForTestQuery(String query, Object... args) throws Exception {
-    this.testOptionSettingQueries = String.format(query, args);
+    testOptionSettingQueries = String.format(query, args);
     return this;
   }
 
   public TestBuilder approximateEquality() {
-    this.approximateEquality = true;
+    approximateEquality = true;
     return this;
   }
 
@@ -243,13 +249,13 @@ public class TestBuilder {
   }
 
   public JSONTestBuilder jsonBaselineFile(String filePath) {
-    return new JSONTestBuilder(filePath, allocator, query, queryType, ordered, approximateEquality,
+    return new JSONTestBuilder(filePath, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison,
         expectedNumBatches);
   }
 
   public CSVTestBuilder csvBaselineFile(String filePath) {
-    return new CSVTestBuilder(filePath, allocator, query, queryType, ordered, approximateEquality,
+    return new CSVTestBuilder(filePath, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison,
         expectedNumBatches);
   }
@@ -259,7 +265,7 @@ public class TestBuilder {
     assert baselineColumns == null : "The column information should be captured in expected schema, not baselineColumns";
 
     return new SchemaTestBuilder(
-        allocator,
+        services,
         query,
         queryType,
         baselineOptionSettingQueries,
@@ -280,7 +286,10 @@ public class TestBuilder {
     }
   }
 
-  // indicate that the tests query should be checked for an empty result set
+  /**
+   * Indicate that the tests query should be checked for an empty result set.
+   * @return the test builder
+   */
   public TestBuilder expectsEmptyResultSet() {
     unOrdered();
     baselineRecords = new ArrayList<>();
@@ -298,6 +307,7 @@ public class TestBuilder {
     this.expectedNumBatches = expectedNumBatches;
     return this;
   }
+
   /**
    * This method is used to pass in a simple list of values for a single record verification without
    * the need to create a CSV or JSON file to store the baseline.
@@ -306,7 +316,7 @@ public class TestBuilder {
    * checks.
    *
    * @param baselineValues - the baseline values to validate
-   * @return
+   * @return the test builder
    */
   public TestBuilder baselineValues(Object ... baselineValues) {
     assert getExpectedSchema() == null : "The expected schema is not needed when baselineValues are provided ";
@@ -339,7 +349,7 @@ public class TestBuilder {
    * with an assumed stable code path and produce the same erroneous result.
    *
    * @param materializedRecords - a list of maps representing materialized results
-   * @return
+   * @return the test builder
    */
   public TestBuilder baselineRecords(List<Map<String, Object>> materializedRecords) {
     this.baselineRecords = materializedRecords;
@@ -374,20 +384,24 @@ public class TestBuilder {
     return baselineRecords != null;
   }
 
-  // provide a SQL query to validate against
+  /**
+   * Provide a SQL query to validate against.
+   * @param baselineQuery
+   * @return the test builder
+   */
   public BaselineQueryTestBuilder sqlBaselineQuery(Object baselineQuery) {
-    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, allocator, query, queryType, ordered, approximateEquality,
+    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
 
   public BaselineQueryTestBuilder sqlBaselineQuery(String query, String ...replacements) {
-    return sqlBaselineQuery(String.format(query, replacements));
+    return sqlBaselineQuery(String.format(query, (Object[]) replacements));
   }
 
   // provide a path to a file containing a SQL query to use as a baseline
   public BaselineQueryTestBuilder sqlBaselineQueryFromFile(String baselineQueryFilename) throws IOException {
     String baselineQuery = BaseTestQuery.getFile(baselineQueryFilename);
-    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, allocator, query, queryType, ordered, approximateEquality,
+    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
 
@@ -395,7 +409,7 @@ public class TestBuilder {
   // that physical plans, or any large JSON strings do not live in the Java source as literals
   public BaselineQueryTestBuilder physicalPlanBaselineQueryFromFile(String baselinePhysicalPlanPath) throws IOException {
     String baselineQuery = BaseTestQuery.getFile(baselinePhysicalPlanPath);
-    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.PHYSICAL, allocator, query, queryType, ordered, approximateEquality,
+    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.PHYSICAL, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
 
@@ -424,11 +438,11 @@ public class TestBuilder {
     // that come out of the test query drive interpretation of baseline
     private TypeProtos.MajorType[] baselineTypes;
 
-    CSVTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
+    CSVTestBuilder(String baselineFile, TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                    boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                    String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                    int expectedNumBatches) {
-      super(allocator, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
+      super(services, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
           highPerformanceComparison, expectedNumBatches);
       this.baselineFilePath = baselineFile;
     }
@@ -515,9 +529,9 @@ public class TestBuilder {
 
   public class SchemaTestBuilder extends TestBuilder {
     private List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
-    SchemaTestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType,
+    SchemaTestBuilder(TestServices services, Object query, UserBitShared.QueryType queryType,
         String baselineOptionSettingQueries, String testOptionSettingQueries, List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
-      super(allocator, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
+      super(services, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
       expectsEmptyResultSet();
       this.expectedSchema = expectedSchema;
     }
@@ -556,11 +570,11 @@ public class TestBuilder {
     // path to the baseline file that will be inserted into the validation query
     private String baselineFilePath;
 
-    JSONTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
+    JSONTestBuilder(String baselineFile, TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                     boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                     String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                     int expectedNumBatches) {
-      super(allocator, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
+      super(services, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
           highPerformanceComparison, expectedNumBatches);
       this.baselineFilePath = baselineFile;
       this.baselineColumns = new String[] {"*"};
@@ -586,12 +600,12 @@ public class TestBuilder {
     private Object baselineQuery;
     private UserBitShared.QueryType baselineQueryType;
 
-    BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, BufferAllocator allocator,
+    BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, TestServices services,
                              Object query, UserBitShared.QueryType queryType, Boolean ordered,
                              boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                              String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                              int expectedNumBatches) {
-      super(allocator, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
+      super(services, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
           highPerformanceComparison, expectedNumBatches);
       this.baselineQuery = baselineQuery;
       this.baselineQueryType = baselineQueryType;

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index bfecf52..4872909 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index 85975cb..50bf710 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -22,192 +22,140 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
-import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.FixtureBuilder;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-public class TestSimpleExternalSort extends BaseTestQuery {
+public class TestSimpleExternalSort extends DrillTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
-  DrillConfig c = DrillConfig.create();
-
 
   @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
 
-  @Ignore
   @Test
-  public void mergeSortWithSv2() throws Exception {
-    List<QueryDataBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending_sv2.json");
-    int count = 0;
-    for(QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() != 0) {
-        count += b.getHeader().getRowCount();
-      }
-    }
-    assertEquals(500000, count);
-
-    long previousBigInt = Long.MAX_VALUE;
-
-    int recordCount = 0;
-    int batchCount = 0;
-
-    for (QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() == 0) {
-        break;
-      }
-      batchCount++;
-      RecordBatchLoader loader = new RecordBatchLoader(allocator);
-      loader.load(b.getHeader().getDef(),b.getData());
-      BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class,
-              loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
-      BigIntVector.Accessor a1 = c1.getAccessor();
+  public void mergeSortWithSv2Legacy() throws Exception {
+    mergeSortWithSv2(true);
+  }
 
-      for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
-        recordCount++;
-        assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
-        previousBigInt = a1.get(i);
-      }
-      loader.clear();
-      b.release();
+  /**
+   * Tests the external sort using an in-memory sort. Relies on default memory
+   * settings to be large enough to do the in-memory sort (there is,
+   * unfortunately, no way to double-check that no spilling was done.)
+   * This must be checked manually by setting a breakpoint in the in-memory
+   * sort routine.
+   *
+   * @param testLegacy
+   * @throws Exception
+   */
+
+  private void mergeSortWithSv2(boolean testLegacy) throws Exception {
+    try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+         ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client, testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending_sv2.json").results();
+      assertEquals(500000, client.countResults( results ));
+      validateResults(client.allocator(), results);
     }
+  }
 
-    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+  private void chooseImpl(ClientFixture client, boolean testLegacy) throws Exception {
   }
 
   @Test
-  public void sortOneKeyDescendingMergeSort() throws Throwable{
-    List<QueryDataBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending.json");
-    int count = 0;
-    for (QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() != 0) {
-        count += b.getHeader().getRowCount();
-      }
+  @Ignore
+  public void sortOneKeyDescendingMergeSortLegacy() throws Throwable {
+    sortOneKeyDescendingMergeSort(true);
+  }
+
+  private void sortOneKeyDescendingMergeSort(boolean testLegacy) throws Throwable {
+    try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+         ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client, testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results();
+      assertEquals(1000000, client.countResults(results));
+      validateResults(client.allocator(), results);
     }
-    assertEquals(1000000, count);
+  }
 
+  private void validateResults(BufferAllocator allocator, List<QueryDataBatch> results) throws SchemaChangeException {
     long previousBigInt = Long.MAX_VALUE;
 
     int recordCount = 0;
     int batchCount = 0;
 
     for (QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() == 0) {
-        continue;
-      }
-      batchCount++;
       RecordBatchLoader loader = new RecordBatchLoader(allocator);
-      loader.load(b.getHeader().getDef(),b.getData());
-      BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
-      BigIntVector.Accessor a1 = c1.getAccessor();
-
-      for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
-        recordCount++;
-        assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
-        previousBigInt = a1.get(i);
-      }
-      loader.clear();
-      b.release();
-    }
-
-    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
-  }
-
-  @Test
-  @Ignore
-  public void sortOneKeyDescendingExternalSort() throws Throwable{
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-
-    DrillConfig config = DrillConfig.create("drill-external-sort.conf");
-
-    try (Drillbit bit1 = new Drillbit(config, serviceSet);
-        Drillbit bit2 = new Drillbit(config, serviceSet);
-        DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
-
-      bit1.run();
-      bit2.run();
-      client.connect();
-      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
-              Files.toString(FileUtils.getResourceAsFile("/xsort/one_key_sort_descending.json"),
-                      Charsets.UTF_8));
-      int count = 0;
-      for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() != 0) {
-          count += b.getHeader().getRowCount();
-        }
-      }
-      assertEquals(1000000, count);
-
-      long previousBigInt = Long.MAX_VALUE;
-
-      int recordCount = 0;
-      int batchCount = 0;
-
-      for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() == 0) {
-          break;
-        }
+      if (b.getHeader().getRowCount() > 0) {
         batchCount++;
-        RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
         loader.load(b.getHeader().getDef(),b.getData());
+        @SuppressWarnings("resource")
         BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
         BigIntVector.Accessor a1 = c1.getAccessor();
 
-        for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
+        for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
           recordCount++;
-          assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
+          assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
           previousBigInt = a1.get(i);
         }
-        loader.clear();
-        b.release();
       }
-      System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
-
+      loader.clear();
+      b.release();
     }
+
+    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
   }
 
+
   @Test
   @Ignore
-  public void outOfMemoryExternalSort() throws Throwable{
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+  public void sortOneKeyDescendingExternalSortLegacy() throws Throwable {
+    sortOneKeyDescendingExternalSort(true);
+  }
 
-    DrillConfig config = DrillConfig.create("drill-oom-xsort.conf");
+  private void sortOneKeyDescendingExternalSort(boolean testLegacy) throws Throwable {
+    FixtureBuilder builder = ClusterFixture.builder( )
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4 )
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4);
+    try (ClusterFixture cluster = builder.build();
+        ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client,testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/one_key_sort_descending.json").results();
+      assertEquals(1000000, client.countResults( results ));
+      validateResults(client.allocator(), results);
+    }
+  }
 
-    try (Drillbit bit1 = new Drillbit(config, serviceSet);
-        DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
+  @Ignore
+  @Test
+  public void outOfMemoryExternalSortLegacy() throws Throwable{
+    outOfMemoryExternalSort(true);
+  }
 
-      bit1.run();
-      client.connect();
-      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
-              Files.toString(FileUtils.getResourceAsFile("/xsort/oom_sort_test.json"),
-                      Charsets.UTF_8));
-      int count = 0;
-      for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() != 0) {
-          count += b.getHeader().getRowCount();
-        }
-      }
-      assertEquals(10000000, count);
+  private void outOfMemoryExternalSort(boolean testLegacy) throws Throwable{
+    FixtureBuilder builder = ClusterFixture.builder( )
+        // Probably do nothing in modern Drill
+        .configProperty( "drill.memory.fragment.max", 50000000 )
+        .configProperty( "drill.memory.fragment.initial", 2000000 )
+        .configProperty( "drill.memory.operator.max", 30000000 )
+        .configProperty( "drill.memory.operator.initial", 2000000 );
+    try (ClusterFixture cluster = builder.build();
+        ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client,testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/oom_sort_test.json").results();
+      assertEquals(10000000, client.countResults( results ));
 
       long previousBigInt = Long.MAX_VALUE;
 
@@ -215,29 +163,25 @@ public class TestSimpleExternalSort extends BaseTestQuery {
       int batchCount = 0;
 
       for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() == 0) {
-          break;
+        RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+        if (b.getHeader().getRowCount() > 0) {
+          batchCount++;
+          loader.load(b.getHeader().getDef(),b.getData());
+          @SuppressWarnings("resource")
+          BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+          BigIntVector.Accessor a1 = c1.getAccessor();
+
+          for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+            recordCount++;
+            assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
+            previousBigInt = a1.get(i);
+          }
+          assertTrue(String.format("%d == %d", a1.get(0), a1.get(a1.getValueCount() - 1)), a1.get(0) != a1.get(a1.getValueCount() - 1));
         }
-        batchCount++;
-        RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
-        loader.load(b.getHeader().getDef(),b.getData());
-        BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
-        BigIntVector.Accessor a1 = c1.getAccessor();
-
-        for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
-          recordCount++;
-          assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
-          previousBigInt = a1.get(i);
-        }
-        assertTrue(String.format("%d == %d", a1.get(0), a1.get(a1.getValueCount() - 1)), a1.get(0) != a1.get(a1.getValueCount() - 1));
         loader.clear();
         b.release();
       }
       System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
-
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
new file mode 100644
index 0000000..6d68757
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+
+import com.google.common.collect.Queues;
+
+/**
+ * Drill query event listener that buffers rows into a producer-consumer
+ * queue. Allows rows to be received asynchronously, but processed by
+ * a synchronous reader.
+ * <p>
+ * Query messages are transformed into events: query ID, batch,
+ * EOF or error.
+ */
+
+public class BufferingQueryEventListener implements UserResultsListener
+{
+  public static class QueryEvent
+  {
+    public enum Type { QUERY_ID, BATCH, EOF, ERROR }
+
+    public final Type type;
+    public QueryId queryId;
+    public QueryDataBatch batch;
+    public Exception error;
+    public QueryState state;
+
+    public QueryEvent(QueryId queryId) {
+      this.queryId = queryId;
+      this.type = Type.QUERY_ID;
+    }
+
+    public QueryEvent(Exception ex) {
+      error = ex;
+      type = Type.ERROR;
+    }
+
+    public QueryEvent(QueryDataBatch batch) {
+      this.batch = batch;
+      type = Type.BATCH;
+    }
+
+    public QueryEvent(QueryState state) {
+      this.type = Type.EOF;
+      this.state = state;
+    }
+  }
+
+  private BlockingQueue<QueryEvent> queue = Queues.newLinkedBlockingQueue();
+
+  @Override
+  public void queryIdArrived(QueryId queryId) {
+    silentPut(new QueryEvent(queryId));
+  }
+
+  @Override
+  public void submissionFailed(UserException ex) {
+    silentPut(new QueryEvent(ex));
+  }
+
+  @Override
+  public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+    silentPut(new QueryEvent(result));
+  }
+
+  @Override
+  public void queryCompleted(QueryState state) {
+    silentPut(new QueryEvent(state));
+  }
+
+  private void silentPut(QueryEvent event) {
+    try {
+      queue.put(event);
+    } catch (InterruptedException e) {
+      // What to do, what to do...
+      e.printStackTrace();
+    }
+  }
+
+  public QueryEvent get() {
+    try {
+      return queue.take();
+    } catch (InterruptedException e) {
+      // Should not occur, but if it does, just pass along the error.
+      return new QueryEvent(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
new file mode 100644
index 0000000..be36dd7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.test.ClusterFixture.FixtureTestServices;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+
+public class ClientFixture implements AutoCloseable {
+
+  public static class ClientBuilder {
+
+    ClusterFixture cluster;
+    Properties clientProps;
+
+    protected ClientBuilder(ClusterFixture cluster) {
+      this.cluster = cluster;
+    }
+    /**
+     * Specify an optional client property.
+     * @param key property name
+     * @param value property value
+     * @return this builder
+     */
+    public ClientBuilder property( String key, Object value ) {
+      if ( clientProps == null ) {
+        clientProps = new Properties( );
+      }
+      clientProps.put(key, value);
+      return this;
+    }
+
+    ClientFixture build( ) {
+      try {
+        return new ClientFixture(this);
+      } catch (RpcException e) {
+
+        // When used in a test with an embedded Drillbit, the
+        // RPC exception should not occur.
+
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  private ClusterFixture cluster;
+  private DrillClient client;
+
+  public ClientFixture(ClientBuilder builder) throws RpcException {
+    this.cluster = builder.cluster;
+
+    // Create a client.
+
+    client = new DrillClient(cluster.config( ), cluster.serviceSet( ).getCoordinator());
+    client.connect(builder.clientProps);
+    cluster.clients.add(this);
+  }
+
+  public DrillClient client() { return client; }
+  public ClusterFixture cluster( ) { return cluster; }
+  public BufferAllocator allocator( ) { return cluster.allocator( ); }
+
+  /**
+   * Set a runtime option.
+   *
+   * @param key
+   * @param value
+   * @throws RpcException
+   */
+
+  public void alterSession(String key, Object value ) throws Exception {
+    String sql = "ALTER SESSION SET `" + key + "` = " + ClusterFixture.stringify( value );
+    runSqlSilently( sql );
+  }
+
+  public void alterSystem(String key, Object value ) throws Exception {
+    String sql = "ALTER SYSTEM SET `" + key + "` = " + ClusterFixture.stringify( value );
+    runSqlSilently( sql );
+  }
+
+  /**
+   * Run SQL silently (discard results.)
+   *
+   * @param sql
+   * @throws RpcException
+   */
+
+  public void runSqlSilently(String sql) throws Exception {
+    queryBuilder().sql(sql).run();
+  }
+
+  public QueryBuilder queryBuilder() {
+    return new QueryBuilder(this);
+  }
+
+  public int countResults(List<QueryDataBatch> results) {
+    int count = 0;
+    for(QueryDataBatch b : results) {
+      count += b.getHeader().getRowCount();
+    }
+    return count;
+  }
+
+  public TestBuilder testBuilder() {
+    return new TestBuilder(new FixtureTestServices(this));
+  }
+
+  /**
+   * Run zero or more queries and optionally print the output in TSV format.
+   * Similar to {@link QueryTestUtil#test}. Output is printed
+   * only if the tests are running as verbose.
+   *
+   * @return the number of rows returned
+   */
+
+  public void runQueries(final String queryString) throws Exception{
+    final String query = QueryTestUtil.normalizeQuery(queryString);
+    String[] queries = query.split(";");
+    for (String q : queries) {
+      final String trimmedQuery = q.trim();
+      if (trimmedQuery.isEmpty()) {
+        continue;
+      }
+      queryBuilder( ).sql(trimmedQuery).print();
+    }
+  }
+
+  @Override
+  public void close( ) {
+    if (client == null) {
+      return;
+    }
+    try {
+      client.close( );
+    } finally {
+      client = null;
+      cluster.clients.remove(this);
+    }
+  }
+
+  /**
+   * Return a parsed query profile for a query summary. Saving of profiles
+   * must be turned on.
+   *
+   * @param summary
+   * @return
+   * @throws IOException
+   */
+
+  public ProfileParser parseProfile(QuerySummary summary) throws IOException {
+    return parseProfile(summary.queryIdString());
+  }
+
+  /**
+   * Parse a query profile from the local storage location given the
+   * query ID. Saving of profiles must be turned on. This is a bit of
+   * a hack: the profile should be available directly from the server.
+   * @throws IOException
+   */
+
+  public ProfileParser parseProfile(String queryId) throws IOException {
+    String tmpDir = cluster().config().getString(ExecConstants.DRILL_TMP_DIR);
+    File drillTmp = new File(new File(tmpDir), "drill");
+    File profileDir = new File(drillTmp, "profiles" );
+    File file = new File( profileDir, queryId + ".sys.drill" );
+    return new ProfileParser(file);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
new file mode 100644
index 0000000..f89eb01
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.DrillTestWrapper.TestServices;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.exec.util.TestUtilities;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+
+/**
+ * Test fixture to start a Drillbit with provide options, create a client, and
+ * execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides
+ * a builder to set the necessary embedded Drillbit and client options, then
+ * creates the requested Drillbit and client.
+ */
+
+public class ClusterFixture implements AutoCloseable {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
+  public static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
+  public static final int MAX_WIDTH_PER_NODE = 2;
+
+  @SuppressWarnings("serial")
+  public static final Properties TEST_CONFIGURATIONS = new Properties() {
+    {
+      // Properties here mimic those in drill-root/pom.xml, Surefire plugin
+      // configuration. They allow tests to run successfully in Eclipse.
+
+      put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false);
+      put(ExecConstants.HTTP_ENABLE, false);
+      put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, true);
+      put("drill.catastrophic_to_standard_out", true);
+
+      // Verbose errors.
+
+      put(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, true);
+
+      // See Drillbit.close. The Drillbit normally waits a specified amount
+      // of time for ZK registration to drop. But, embedded Drillbits normally
+      // don't use ZK, so no need to wait.
+
+      put(ExecConstants.ZK_REFRESH, 0);
+
+      // This is just a test, no need to be heavy-duty on threads.
+      // This is the number of server and client RPC threads. The
+      // production default is DEFAULT_SERVER_RPC_THREADS.
+
+      put(ExecConstants.BIT_SERVER_RPC_THREADS, 2);
+
+      // No need for many scanners except when explicitly testing that
+      // behavior. Production default is DEFAULT_SCAN_THREADS
+
+      put(ExecConstants.SCAN_THREADPOOL_SIZE, 4);
+    }
+  };
+
+  public static final String DEFAULT_BIT_NAME = "drillbit";
+
+  private DrillConfig config;
+  private Map<String,Drillbit> bits = new HashMap<>();
+  private Drillbit defaultDrillbit;
+  private BufferAllocator allocator;
+  private boolean ownsZK;
+  private ZookeeperHelper zkHelper;
+  private RemoteServiceSet serviceSet;
+  private String dfsTestTmpSchemaLocation;
+  protected List<ClientFixture> clients = new ArrayList<>();
+
+  protected ClusterFixture(FixtureBuilder  builder) throws Exception {
+
+    // Start ZK if requested.
+
+    if (builder.zkHelper != null) {
+      zkHelper = builder.zkHelper;
+      ownsZK = false;
+    } else if (builder.zkCount > 0) {
+      zkHelper = new ZookeeperHelper(true);
+      zkHelper.startZookeeper(builder.zkCount);
+      ownsZK = true;
+    }
+
+    // Create a config
+    // Because of the way DrillConfig works, we can set the ZK
+    // connection string only if a property set is provided.
+
+    if (builder.configResource != null) {
+      config = DrillConfig.create(builder.configResource);
+    } else if (builder.configProps != null) {
+      config = DrillConfig.create(configProperties(builder.configProps));
+    } else {
+      config = DrillConfig.create(configProperties(TEST_CONFIGURATIONS));
+    }
+
+    // Not quite sure what this is, but some tests seem to use it.
+
+    if (builder.enableFullCache ||
+        (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE))) {
+      serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator);
+    } else {
+      serviceSet = RemoteServiceSet.getLocalServiceSet();
+    }
+
+    dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
+
+    Preconditions.checkArgument(builder.bitCount > 0);
+    int bitCount = builder.bitCount;
+    for (int i = 0; i < bitCount; i++) {
+      @SuppressWarnings("resource")
+      Drillbit bit = new Drillbit(config, serviceSet);
+      bit.run();
+
+      // Create the dfs_test name space
+
+      @SuppressWarnings("resource")
+      final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage();
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
+      TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+
+      // Create the mock data plugin
+      // (Disabled until DRILL-5152 is committed.)
+
+      MockStorageEngineConfig config = MockStorageEngineConfig.INSTANCE;
+      @SuppressWarnings("resource")
+      MockStorageEngine plugin = new MockStorageEngine(
+          MockStorageEngineConfig.INSTANCE, bit.getContext(),
+          MockStorageEngineConfig.NAME);
+      ((StoragePluginRegistryImpl) pluginRegistry)
+          .definePlugin(MockStorageEngineConfig.NAME, config, plugin);
+
+      // Bit name and registration.
+
+      String name;
+      if (builder.bitNames != null && i < builder.bitNames.length) {
+        name = builder.bitNames[i];
+      } else {
+
+        // Name the Drillbit by default. Most tests use one Drillbit,
+        // so make the name simple: "drillbit." Only add a numeric suffix
+        // when the test creates multiple bits.
+
+        if (bitCount == 1) {
+          name = DEFAULT_BIT_NAME;
+        } else {
+          name = DEFAULT_BIT_NAME + Integer.toString(i+1);
+        }
+      }
+      bits.put(name, bit);
+
+      // Remember the first Drillbit, this is the default one returned from
+      // drillbit().
+
+      if (i == 0) {
+        defaultDrillbit = bit;
+      }
+    }
+
+    // Some operations need an allocator.
+
+    allocator = RootAllocatorFactory.newRoot(config);
+
+    // Apply system options
+
+    if (builder.systemOptions != null) {
+      for (FixtureBuilder.RuntimeOption option : builder.systemOptions) {
+        clientFixture().alterSystem(option.key, option.value);
+      }
+    }
+
+    // Apply session options.
+
+    if (builder.sessionOptions != null) {
+      for (FixtureBuilder.RuntimeOption option : builder.sessionOptions) {
+        clientFixture().alterSession(option.key, option.value);
+      }
+    }
+  }
+
+  private Properties configProperties(Properties configProps) {
+    Properties effectiveProps = new Properties();
+    for (Entry<Object, Object> entry : configProps.entrySet()) {
+      effectiveProps.put(entry.getKey(), entry.getValue().toString());
+    }
+    if (zkHelper != null) {
+      effectiveProps.put(ExecConstants.ZK_CONNECTION, zkHelper.getConfig().getString(ExecConstants.ZK_CONNECTION));
+    }
+    return effectiveProps;
+  }
+
+  public Drillbit drillbit() { return defaultDrillbit; }
+  public Drillbit drillbit(String name) { return bits.get(name); }
+  public Collection<Drillbit> drillbits() { return bits.values(); }
+  public RemoteServiceSet serviceSet() { return serviceSet; }
+  public BufferAllocator allocator() { return allocator; }
+  public DrillConfig config() { return config; }
+
+  public ClientFixture.ClientBuilder clientBuilder() {
+    return new ClientFixture.ClientBuilder(this);
+  }
+
+  public ClientFixture clientFixture() {
+    if (clients.isEmpty()) {
+      clientBuilder().build();
+    }
+    return clients.get(0);
+  }
+
+  public DrillClient client() {
+    return clientFixture().client();
+  }
+
+  /**
+   * Close the clients, drillbits, allocator and
+   * Zookeeper. Checks for exceptions. If an exception occurs,
+   * continues closing, suppresses subsequent exceptions, and
+   * throws the first exception at completion of close. This allows
+   * the test code to detect any state corruption which only shows
+   * itself when shutting down resources (memory leaks, for example.)
+   */
+
+  @Override
+  public void close() throws Exception {
+    Exception ex = null;
+
+    // Close clients. Clients remove themselves from the client
+    // list.
+
+    while (!clients.isEmpty()) {
+      ex = safeClose(clients.get(0), ex);
+    }
+
+    for (Drillbit bit : drillbits()) {
+      ex = safeClose(bit, ex);
+    }
+    bits.clear();
+    ex = safeClose(serviceSet, ex);
+    serviceSet = null;
+    ex = safeClose(allocator, ex);
+    allocator = null;
+    if (zkHelper != null && ownsZK) {
+      try {
+        zkHelper.stopZookeeper();
+      } catch (Exception e) {
+        ex = ex == null ? e : ex;
+      }
+    }
+    zkHelper = null;
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  private Exception safeClose(AutoCloseable item, Exception ex) {
+    try {
+      if (item != null) {
+        item.close();
+      }
+    } catch (Exception e) {
+      ex = ex == null ? e : ex;
+    }
+    return ex;
+  }
+
+  public void defineWorkspace(String pluginName, String schemaName, String path,
+      String defaultFormat) throws ExecutionSetupException {
+    for (Drillbit bit : drillbits()) {
+      defineWorkspace(bit, pluginName, schemaName, path, defaultFormat);
+    }
+  }
+
+  public static void defineWorkspace(Drillbit drillbit, String pluginName,
+      String schemaName, String path, String defaultFormat)
+      throws ExecutionSetupException {
+    @SuppressWarnings("resource")
+    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+    @SuppressWarnings("resource")
+    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(pluginName);
+    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
+    final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(path, true, defaultFormat);
+
+    pluginConfig.workspaces.remove(schemaName);
+    pluginConfig.workspaces.put(schemaName, newTmpWSConfig);
+
+    pluginRegistry.createOrUpdate(pluginName, pluginConfig, true);
+  }
+
+  public static final String EXPLAIN_PLAN_TEXT = "text";
+  public static final String EXPLAIN_PLAN_JSON = "json";
+
+  public static FixtureBuilder builder() {
+     return new FixtureBuilder()
+         .configProps(FixtureBuilder.defaultProps())
+         .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE)
+         ;
+  }
+
+  public static FixtureBuilder bareBuilder() {
+    return new FixtureBuilder();
+  }
+
+  public static class FixtureTestServices implements TestServices {
+
+    private ClientFixture client;
+
+    public FixtureTestServices(ClientFixture client) {
+      this.client = client;
+    }
+
+    @Override
+    public BufferAllocator allocator() {
+      return client.allocator();
+    }
+
+    @Override
+    public void test(String query) throws Exception {
+      client.runQueries(query);
+    }
+
+    @Override
+    public List<QueryDataBatch> testRunAndReturn(QueryType type, Object query)
+        throws Exception {
+      return client.queryBuilder().query(type, (String) query).results();
+    }
+  }
+
+  public static ClusterFixture standardCluster() throws Exception {
+    return builder().build();
+  }
+
+  static String stringify(Object value) {
+    if (value instanceof String) {
+      return "'" + (String) value + "'";
+    } else {
+      return value.toString();
+    }
+  }
+
+  public static String getResource(String resource) throws IOException {
+    // Unlike the Java routines, Guava does not like a leading slash.
+
+    final URL url = Resources.getResource(trimSlash(resource));
+    if (url == null) {
+      throw new IOException(String.format("Unable to find resource %s.", resource));
+    }
+    return Resources.toString(url, Charsets.UTF_8);
+  }
+
+  public static String loadResource(String resource) {
+    try {
+      return getResource(resource);
+    } catch (IOException e) {
+      throw new IllegalStateException("Resource not found: " + resource, e);
+    }
+  }
+
+  static String trimSlash(String path) {
+    if (path == null) {
+      return path;
+    } else if (path.startsWith("/")) {
+      return path.substring(1);
+    } else {
+      return path;
+    }
+  }
+
+  /**
+   * Create a temp directory to store the given <i>dirName</i>.
+   * Directory will be deleted on exit. Directory is created if it does
+   * not exist.
+   * @param dirName directory name
+   * @return Full path including temp parent directory and given directory name.
+   */
+  public static File getTempDir(final String dirName) {
+    final File dir = Files.createTempDir();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        FileUtils.deleteQuietly(dir);
+      }
+    });
+    File tempDir = new File(dir, dirName);
+    tempDir.mkdirs();
+    return tempDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
new file mode 100644
index 0000000..62beedd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.\u2030
+ */
+package org.apache.drill.test;
+
+import java.io.IOException;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+
+/**
+ * Base class for tests that use a single cluster fixture for a set of
+ * tests. Extend your test case directly from {@link DrillTest} if you
+ * need to start up and shut down a cluster multiple times.
+ * <p>
+ * To create a test with a single cluster config, do the following:
+ * <pre><code>
+ * public class YourTest extends ClusterTest {
+ *   {@literal @}BeforeClass
+ *   public static setup( ) throws Exception {
+ *     FixtureBuilder builder = ClusterFixture.builder()
+ *       // Set options, etc.
+ *       ;
+ *     startCluster(builder);
+ *   }
+ *
+ *   // Your tests
+ * }
+ * </code></pre>
+ * This class takes care of shutting down the cluster at the end of the test.
+ * <p>
+ * The simplest possible setup:
+ * <pre><code>
+ *   {@literal @}BeforeClass
+ *   public static setup( ) throws Exception {
+ *     startCluster(ClusterFixture.builder( ));
+ *   }
+ * </code></pre>
+ * <p>
+ * If you need to start the cluster with different (boot time) configurations,
+ * do the following instead:
+ * <pre><code>
+ * public class YourTest extends DrillTest {
+ *   {@literal @}Test
+ *   public someTest() throws Exception {
+ *     FixtureBuilder builder = ClusterFixture.builder()
+ *       // Set options, etc.
+ *       ;
+ *     try(ClusterFixture cluster = builder.build) {
+ *       // Tests here
+ *     }
+ *   }
+ * }
+ * </code></pre>
+ * The try-with-resources block ensures that the cluster is shut down at
+ * the end of each test method.
+ */
+
+public class ClusterTest extends DrillTest {
+
+  protected static ClusterFixture cluster;
+  protected static ClientFixture client;
+
+  protected static void startCluster(FixtureBuilder builder) throws Exception {
+    cluster = builder.build();
+    client = cluster.clientFixture();
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    AutoCloseables.close(client, cluster);
+  }
+
+  /**
+   * Convenience method when converting classic tests to use the
+   * cluster fixture.
+   * @return a test builder that works against the cluster fixture
+   */
+
+  public TestBuilder testBuilder() {
+    return client.testBuilder();
+  }
+
+  /**
+   * Convenience method when converting classic tests to use the
+   * cluster fixture.
+   * @return the contents of the resource text file
+   */
+
+  public String getFile(String resource) throws IOException {
+    return ClusterFixture.getResource(resource);
+  }
+
+  public void test(String sqlQuery) throws Exception {
+    client.runQueries(sqlQuery);
+  }
+
+  public static void test(String query, Object... args) throws Exception {
+    client.queryBuilder().sql(query, args).run( );
+  }
+
+  public QueryBuilder queryBuilder( ) {
+    return client.queryBuilder();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java b/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java
new file mode 100644
index 0000000..3812217
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.test;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Basic representation of a column parsed from a query profile.
+ * Idea is to use this to generate mock data that represents a
+ * query obtained from a user. This is a work in progress.
+ */
+
+public class FieldDef {
+  public enum Type { VARCHAR, DOUBLE };
+  public enum TypeHint { DATE, TIME };
+
+  public final String name;
+  public final String typeStr;
+  public final Type type;
+  public int length;
+  public TypeHint hint;
+
+  public FieldDef(String name, String typeStr) {
+    this.name = name;
+    this.typeStr = typeStr;
+
+    // Matches the type as provided in the query profile:
+    // name:type(length)
+    // Length is provided for VARCHAR fields. Examples:
+    // count: INTEGER
+    // customerName: VARCHAR(50)
+
+    Pattern p = Pattern.compile("(\\w+)(?:\\((\\d+)\\))?");
+    Matcher m = p.matcher(typeStr);
+    if (! m.matches()) { throw new IllegalStateException(); }
+    if (m.group(2) == null) {
+      length = 0;
+    } else {
+      length = Integer.parseInt(m.group(2));
+    }
+    switch (m.group(1).toUpperCase()) {
+    case "VARCHAR":
+      type = Type.VARCHAR;
+      break;
+    case "DOUBLE":
+      type = Type.DOUBLE;
+      break;
+    // TODO: Add other types over time.
+    default:
+      type = null;
+    }
+
+  }
+
+  @Override
+  public String toString() {
+    String str = name + ": " + typeStr;
+    if (type != null) {
+      str += " - " + type.name();
+      if (length != 0) {
+        str += "(" + length + ")";
+      }
+    }
+    return str;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
new file mode 100644
index 0000000..e56f190
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ZookeeperHelper;
+
+/**
+ * Build a Drillbit and client with the options provided. The simplest
+ * builder starts an embedded Drillbit, with the "dfs_test" name space,
+ * a max width (parallelization) of 2.
+ */
+
+public class FixtureBuilder {
+
+  public static class RuntimeOption {
+    public String key;
+    public Object value;
+
+    public RuntimeOption(String key, Object value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  // Values in the drill-module.conf file for values that are customized
+  // in the defaults.
+
+  public static final int DEFAULT_ZK_REFRESH = 500; // ms
+  public static final int DEFAULT_SERVER_RPC_THREADS = 10;
+  public static final int DEFAULT_SCAN_THREADS = 8;
+
+  public static Properties defaultProps() {
+    Properties props = new Properties();
+    props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
+    return props;
+  }
+
+  String configResource;
+  Properties configProps;
+  boolean enableFullCache;
+  List<RuntimeOption> sessionOptions;
+  List<RuntimeOption> systemOptions;
+  int bitCount = 1;
+  String bitNames[];
+  int zkCount;
+  ZookeeperHelper zkHelper;
+
+  /**
+   * Use the given configuration properties to start the embedded Drillbit.
+   * @param configProps a collection of config properties
+   * @return this builder
+   * @see {@link #configProperty(String, Object)}
+   */
+
+  public FixtureBuilder configProps(Properties configProps) {
+    this.configProps = configProps;
+    return this;
+  }
+
+  /**
+   * Use the given configuration file, stored as a resource, to start the
+   * embedded Drillbit. Note that the resource file should have the two
+   * following settings to work as a test:
+   * <pre><code>
+   * drill.exec.sys.store.provider.local.write : false,
+   * drill.exec.http.enabled : false
+   * </code></pre>
+   * It may be more convenient to add your settings to the default
+   * config settings with {@link #configProperty(String, Object)}.
+   * @param configResource path to the file that contains the
+   * config file to be read
+   * @return this builder
+   * @see {@link #configProperty(String, Object)}
+   */
+
+  public FixtureBuilder configResource(String configResource) {
+
+    // TypeSafe gets unhappy about a leading slash, but other functions
+    // require it. Silently discard the leading slash if given to
+    // preserve the test writer's sanity.
+
+    this.configResource = ClusterFixture.trimSlash(configResource);
+    return this;
+  }
+
+  /**
+   * Add an additional boot-time property for the embedded Drillbit.
+   * @param key config property name
+   * @param value property value
+   * @return this builder
+   */
+
+  public FixtureBuilder configProperty(String key, Object value) {
+    if (configProps == null) {
+      configProps = defaultProps();
+    }
+    configProps.put(key, value.toString());
+    return this;
+  }
+
+   /**
+   * Provide a session option to be set once the Drillbit
+   * is started.
+   *
+   * @param key the name of the session option
+   * @param value the value of the session option
+   * @return this builder
+   * @see {@link ClusterFixture#alterSession(String, Object)}
+   */
+
+  public FixtureBuilder sessionOption(String key, Object value) {
+    if (sessionOptions == null) {
+      sessionOptions = new ArrayList<>();
+    }
+    sessionOptions.add(new RuntimeOption(key, value));
+    return this;
+  }
+
+  /**
+   * Provide a system option to be set once the Drillbit
+   * is started.
+   *
+   * @param key the name of the system option
+   * @param value the value of the system option
+   * @return this builder
+   * @see {@link ClusterFixture#alterSystem(String, Object)}
+   */
+
+  public FixtureBuilder systemOption(String key, Object value) {
+    if (systemOptions == null) {
+      systemOptions = new ArrayList<>();
+    }
+    systemOptions.add(new RuntimeOption(key, value));
+    return this;
+  }
+
+  /**
+   * Set the maximum parallelization (max width per node). Defaults
+   * to 2.
+   *
+   * @param n the "max width per node" parallelization option.
+   * @return this builder
+   */
+  public FixtureBuilder maxParallelization(int n) {
+    return sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, n);
+  }
+
+  public FixtureBuilder enableFullCache() {
+    enableFullCache = true;
+    return this;
+  }
+
+  /**
+   * The number of Drillbits to start in the cluster.
+   *
+   * @param n the desired cluster size
+   * @return this builder
+   */
+  public FixtureBuilder clusterSize(int n) {
+    bitCount = n;
+    bitNames = null;
+    return this;
+  }
+
+  /**
+   * Define a cluster by providing names to the Drillbits.
+   * The cluster size is the same as the number of names provided.
+   *
+   * @param bitNames array of (unique) Drillbit names
+   * @return this builder
+   */
+  public FixtureBuilder withBits(String bitNames[]) {
+    this.bitNames = bitNames;
+    bitCount = bitNames.length;
+    return this;
+  }
+
+  /**
+   * By default the embedded Drillbits use an in-memory cluster coordinator.
+   * Use this option to start an in-memory ZK instance to coordinate the
+   * Drillbits.
+   * @return this builder
+   */
+  public FixtureBuilder withZk() {
+    return withZk(1);
+  }
+
+  public FixtureBuilder withZk(int count) {
+    zkCount = count;
+
+    // Using ZK. Turn refresh wait back on.
+
+    configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
+    return this;
+  }
+
+  /**
+   * Run the cluster using a Zookeeper started externally. Use this if
+   * multiple tests start a cluster: allows ZK to be started once for
+   * the entire suite rather than once per test case.
+   *
+   * @param zk the global Zookeeper to use
+   * @return this builder
+   */
+  public FixtureBuilder withZk(ZookeeperHelper zk) {
+    zkHelper = zk;
+
+    // Using ZK. Turn refresh wait back on.
+
+    configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
+    return this;
+  }
+
+  /**
+   * Create the embedded Drillbit and client, applying the options set
+   * in the builder. Best to use this in a try-with-resources block:
+   * <pre><code>
+   * FixtureBuilder builder = ClientFixture.newBuilder()
+   *   .property(...)
+   *   .sessionOption(...)
+   *   ;
+   * try (ClusterFixture cluster = builder.build();
+   *      ClientFixture client = cluster.clientFixture()) {
+   *   // Do the test
+   * }
+   * </code></pre>
+   * Note that you use a single cluster fixture to create any number of
+   * drillbits in your cluster. If you want multiple clients, create the
+   * first as above, the others (or even the first) using the
+   * {@link ClusterFixture#clientBuilder()}. Using the client builder
+   * also lets you set client-side options in the rare cases that you
+   * need them.
+   *
+   * @return
+   * @throws Exception
+   */
+  public ClusterFixture build() throws Exception {
+    return new ClusterFixture(this);
+  }
+}
\ No newline at end of file