You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by su...@apache.org on 2017/01/30 19:42:38 UTC

[1/4] drill git commit: DRILL-5218: Support optionally disabling heartbeats from C++ client

Repository: drill
Updated Branches:
  refs/heads/master 2af709f43 -> 60624af22


DRILL-5218: Support optionally disabling heartbeats from C++ client

closes #726


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/837722c7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/837722c7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/837722c7

Branch: refs/heads/master
Commit: 837722c7433cf89447b71aa6139f22f03992875c
Parents: 2af709f
Author: Sudheesh Katkam <su...@apache.org>
Authored: Wed Jan 25 14:43:41 2017 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Mon Jan 30 10:09:38 2017 -0800

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |  5 +++
 .../native/client/src/clientlib/drillClient.cpp |  2 +-
 .../client/src/clientlib/drillClientImpl.cpp    | 40 +++++---------------
 .../client/src/clientlib/drillClientImpl.hpp    |  3 +-
 4 files changed, 17 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 2eeaf35..60f7b8a 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -41,6 +41,7 @@ struct Option{
     {"syncSend", "Send query only after previous result is received", false},
     {"hshakeTimeout", "Handshake timeout (second).", false},
     {"queryTimeout", "Query timeout (second).", false},
+    {"heartbeatFrequency", "Heartbeat frequency (second). Disabled if set to 0.", false},
     {"user", "Username", false},
     {"password", "Password", false}
 };
@@ -282,6 +283,7 @@ int main(int argc, char* argv[]) {
         std::string syncSend=qsOptionValues["syncSend"];
         std::string hshakeTimeout=qsOptionValues["hshakeTimeout"];
         std::string queryTimeout=qsOptionValues["queryTimeout"];
+        std::string heartbeatFrequency=qsOptionValues["heartbeatFrequency"];
         std::string user=qsOptionValues["user"];
         std::string password=qsOptionValues["password"];
 
@@ -343,6 +345,9 @@ int main(int argc, char* argv[]) {
         if (!queryTimeout.empty()){
             Drill::DrillClientConfig::setQueryTimeout(atoi(queryTimeout.c_str()));
         }
+        if(!heartbeatFrequency.empty()) {
+            Drill::DrillClientConfig::setHeartbeatFrequency(atoi(heartbeatFrequency.c_str()));
+        }
 
         Drill::DrillUserProperties props;
         if(schema.length()>0){

http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index b02f993..fe9c3a6 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -105,7 +105,7 @@ void DrillClientConfig::setQueryTimeout(int32_t t){
 }
 
 void DrillClientConfig::setHeartbeatFrequency(int32_t t){
-    if (t>0){
+    if (t>=0){
         boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
         s_heartbeatFrequency=t;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 51ae1a2..038ca90 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -150,15 +150,18 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const char* port){
 }
 
 void DrillClientImpl::startHeartbeatTimer(){
-    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
-        << DrillClientConfig::getHeartbeatFrequency() << " seconds." << std::endl;)
-    m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
-    m_heartbeatTimer.async_wait(boost::bind(
+    if (DrillClientConfig::getHeartbeatFrequency() > 0) {
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+                                          << DrillClientConfig::getHeartbeatFrequency()
+                                          << " seconds." << std::endl;)
+        m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
+        m_heartbeatTimer.async_wait(boost::bind(
                 &DrillClientImpl::handleHeartbeatTimeout,
                 this,
                 boost::asio::placeholders::error
-                ));
+        ));
         startMessageListener(); // start this thread early so we don't have the timer blocked
+    }
 }
 
 connectionStatus_t DrillClientImpl::sendHeartbeat(){
@@ -178,12 +181,6 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
     return status;
 }
 
-void DrillClientImpl::resetHeartbeatTimer(){
-    m_heartbeatTimer.cancel();
-    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;)
-    startHeartbeatTimer();
-}
-
 void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
     if(err != boost::asio::error::operation_aborted){
@@ -350,7 +347,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* prope
     u2b.set_channel(exec::shared::USER);
     u2b.set_rpc_version(DRILL_RPC_VERSION);
     u2b.set_support_listening(true);
-    u2b.set_support_timeout(true);
+    u2b.set_support_timeout(DrillClientConfig::getHeartbeatFrequency() > 0);
 
     // Adding version info
     exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
@@ -663,7 +660,7 @@ void DrillClientImpl::getNextResult(){
             ));
     }
 
-    resetHeartbeatTimer();
+    startHeartbeatTimer();
 
     async_read(
             this->m_socket,
@@ -1399,23 +1396,6 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
             s = processQueryData(allocatedBuffer, msg);
             break;
 
-        case exec::user::HANDSHAKE:
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake request from server. Send response.\n";)
-            delete allocatedBuffer;
-            // In one case when the client hung, we observed that the server was sending a handshake request to the client
-            // We should properly handle these handshake requests/responses
-            {
-                boost::lock_guard<boost::mutex> lockDC(this->m_dcMutex);
-                exec::user::UserToBitHandshake u2b;
-                u2b.set_channel(exec::shared::USER);
-                u2b.set_rpc_version(DRILL_RPC_VERSION);
-                u2b.set_support_listening(true);
-                rpc::OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE, msg.m_coord_id, &u2b);
-                sendSync(out_msg);
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response sent.\n";)
-            }
-            break;
-
         default:
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
                     << "QueryResult returned " << msg.m_rpc_type << std::endl;)

http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 8da37b6..22e34af 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -451,9 +451,8 @@ class DrillClientImpl : public DrillClientImplBase{
         // Direct connection to a drillbit
         // host can be name or ip address, port can be port number or name of service in /etc/services
         connectionStatus_t connect(const char* host, const char* port);
-        void startHeartbeatTimer();// start a heartbeat timer
+        void startHeartbeatTimer();// start or restart the heartbeat timer
         connectionStatus_t sendHeartbeat(); // send a heartbeat to the server
-        void resetHeartbeatTimer(); // reset the heartbeat timer (called every time one sends a message to the server (after sendAck, or submitQuery)
         void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
 
         int32_t getNextCoordinationId(){ return ++m_coordinationId; };


[2/4] drill git commit: DRILL-5126: Provide simplified, unified "cluster fixture" for test

Posted by su...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
new file mode 100644
index 0000000..d2242a1
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.ConsoleAppender;
+
+/**
+ * Establishes test-specific logging without having to alter the global
+ * <tt>logback-test.xml</tt> file. Allows directing output to the console
+ * (if not already configured) and setting the log level on specific loggers
+ * of interest in the test. The fixture automatically restores the original
+ * log configuration on exit.
+ * <p>
+ * Typical usage: <pre><code>
+ * {@literal @}Test
+ * public void myTest() {
+ *   LogFixtureBuilder logBuilder = LogFixture.builder()
+ *          .toConsole()
+ *          .disable() // Silence all other loggers
+ *          .logger(ExternalSortBatch.class, Level.DEBUG);
+ *   try (LogFixture logs = logBuilder.build()) {
+ *     // Test code here
+ *   }
+ * }</code></pre>
+ *  <p>
+ * You can &ndash; and should &ndash; combine the log fixtue with the
+ * cluster and client fixtures to have complete control over your test-time
+ * Drill environment.
+ */
+
+public class LogFixture implements AutoCloseable {
+
+  // Elapsed time in ms, log level, thread, logger, message.
+
+  public static final String DEFAULT_CONSOLE_FORMAT = "%r %level [%thread] [%logger] - %msg%n";
+  private static final String DRILL_PACKAGE_NAME = "org.apache.drill";
+
+  /**
+   * Memento for a logger name and level.
+   */
+  public static class LogSpec {
+    String loggerName;
+    Level logLevel;
+
+    public LogSpec(String loggerName, Level level) {
+      this.loggerName = loggerName;
+      this.logLevel = level;
+    }
+  }
+
+  /**
+   * Builds the log settings to be used for a test. The log settings here
+   * add to those specified in a <tt>logback.xml</tt> or
+   * <tt>logback-test.xml</tt> file on your class path. In particular, if
+   * the logging configuration already redirects the Drill logger to the
+   * console, setting console logging here does nothing.
+   */
+
+  public static class LogFixtureBuilder {
+
+    private String consoleFormat = DEFAULT_CONSOLE_FORMAT;
+    private boolean logToConsole;
+    private List<LogSpec> loggers = new ArrayList<>();
+
+    /**
+     * Send all enabled logging to the console (if not already configured.) Some
+     * Drill log configuration files send the root to the console (or file), but
+     * the Drill loggers to Lilith. In that case, Lilith "hides" the console
+     * logger. Using this call adds a console logger to the Drill logger so that
+     * output does, in fact, go to the console regardless of the configuration
+     * in the Logback configuration file.
+     *
+     * @return this builder
+     */
+    public LogFixtureBuilder toConsole() {
+      logToConsole = true;
+      return this;
+    }
+
+    /**
+     * Send logging to the console using the defined format.
+     *
+     * @param format valid Logback log format
+     * @return this builder
+     */
+
+    public LogFixtureBuilder toConsole(String format) {
+      consoleFormat = format;
+      return toConsole();
+    }
+
+    /**
+     * Set a specific logger to the given level.
+     *
+     * @param loggerName name of the logger (typically used for package-level
+     * loggers)
+     * @param level the desired Logback-defined level
+     * @return this builder
+     */
+    public LogFixtureBuilder logger(String loggerName, Level level) {
+      loggers.add(new LogSpec(loggerName, level));
+      return this;
+    }
+
+    /**
+     * Set a specific logger to the given level.
+     *
+     * @param loggerClass class that defines the logger (typically used for
+     * class-specific loggers)
+     * @param level the desired Logback-defined level
+     * @return this builder
+     */
+    public LogFixtureBuilder logger(Class<?> loggerClass, Level level) {
+      loggers.add(new LogSpec(loggerClass.getName(), level));
+      return this;
+    }
+
+    /**
+     * Turns off all logging. If called first, you can set disable as your
+     * general policy, then turn back on loggers selectively for those
+     * of interest.
+     * @return this builder
+     */
+    public LogFixtureBuilder disable() {
+      return rootLogger(Level.OFF);
+    }
+
+    /**
+     * Set the desired log level on the root logger.
+     * @param level the desired Logback log level
+     * @return this builder
+     */
+
+    public LogFixtureBuilder rootLogger(Level level) {
+      loggers.add(new LogSpec(Logger.ROOT_LOGGER_NAME, level));
+      return this;
+    }
+
+    /**
+     * Apply the log levels and output, then return a fixture to be used
+     * in a try-with-resources block. The fixture automatically restores
+     * the original configuration on completion of the try block.
+     * @return the log fixture
+     */
+    public LogFixture build() {
+      return new LogFixture(this);
+    }
+  }
+
+  private PatternLayoutEncoder ple;
+  private ConsoleAppender<ILoggingEvent> appender;
+  private List<LogSpec> loggers = new ArrayList<>();
+  private Logger drillLogger;
+
+  public LogFixture(LogFixtureBuilder builder) {
+    if (builder.logToConsole) {
+      setupConsole(builder);
+    }
+    setupLoggers(builder);
+  }
+
+  /**
+   * Creates a new log fixture builder.
+   * @return the log fixture builder
+   */
+
+  public static LogFixtureBuilder builder() {
+    return new LogFixtureBuilder();
+  }
+
+  private void setupConsole(LogFixtureBuilder builder) {
+    Logger drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME);
+    if (drillLogger.getAppender("STDOUT") != null) {
+      return;
+    }
+    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+    ple = new PatternLayoutEncoder();
+    ple.setPattern(builder.consoleFormat);
+    ple.setContext(lc);
+    ple.start();
+
+    appender = new ConsoleAppender<>( );
+    appender.setContext(lc);
+    appender.setName("Console");
+    appender.setEncoder( ple );
+    appender.start();
+
+    Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+    root.addAppender(appender);
+    drillLogger.addAppender(appender);
+  }
+
+  private void setupLoggers(LogFixtureBuilder builder) {
+    for (LogSpec spec : builder.loggers) {
+      setupLogger(spec);
+    }
+  }
+
+  private void setupLogger(LogSpec spec) {
+    Logger logger = (Logger)LoggerFactory.getLogger(spec.loggerName);
+    Level oldLevel = logger.getLevel();
+    logger.setLevel(spec.logLevel);
+    loggers.add(new LogSpec(spec.loggerName, oldLevel));
+  }
+
+  @Override
+  public void close() {
+    restoreLoggers();
+    restoreConsole();
+  }
+
+  private void restoreLoggers() {
+    for (LogSpec spec : loggers) {
+      Logger logger = (Logger)LoggerFactory.getLogger(spec.loggerName);
+      logger.setLevel(spec.logLevel);
+    }
+  }
+
+  private void restoreConsole() {
+    if (appender == null) {
+      return;
+    }
+    Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+    root.detachAppender(appender);
+    drillLogger.detachAppender(appender);
+    appender.stop();
+    ple.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
new file mode 100644
index 0000000..f9df768
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ProfileParser.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonNumber;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import javax.json.JsonValue;
+
+/**
+ * Parses a query profile and provides access to various bits of the profile
+ * for diagnostic purposes during tests.
+ */
+
+public class ProfileParser {
+
+  JsonObject profile;
+  List<String> plans;
+
+  public ProfileParser( File file ) throws IOException {
+    try (FileReader fileReader = new FileReader(file);
+         JsonReader reader = Json.createReader(fileReader)) {
+      profile = (JsonObject) reader.read();
+    }
+  }
+
+  public String getQuery( ) {
+    return profile.get("query").toString();
+  }
+
+  public String getPlan() {
+    return profile.get("plan").toString();
+  }
+
+  public List<String> getPlans() {
+    if ( plans != null ) {
+      return plans; }
+    String plan = getPlan( );
+    Pattern p = Pattern.compile( "(\\d\\d-\\d+[^\\\\]*)\\\\n", Pattern.MULTILINE );
+    Matcher m = p.matcher(plan);
+    plans = new ArrayList<>( );
+    while ( m.find() ) {
+      plans.add(m.group(1));
+    }
+    return plans;
+  }
+
+  public List<String> getScans( ) {
+    List<String> scans = new ArrayList<>();
+    int n = getPlans( ).size();
+//    Pattern p = Pattern.compile( "\\d+-\\d+\\s+(\\w+)\\(" );
+    for ( int i = n-1; i >= 0;  i-- ) {
+      String plan = plans.get( i );
+//      Matcher m = p.matcher( plan );
+//      if ( ! m.find() ) { continue; }
+      if ( plan.contains( " Scan(" ) ) {
+        scans.add( plan );
+      }
+    }
+    return scans;
+  }
+
+  public List<FieldDef> getColumns( String plan ) {
+    Pattern p = Pattern.compile( "RecordType\\((.*)\\):" );
+    Matcher m = p.matcher(plan);
+    if ( ! m.find() ) { return null; }
+    String frag = m.group(1);
+    String parts[] = frag.split( ", " );
+    List<FieldDef> fields = new ArrayList<>( );
+    for ( String part : parts ) {
+      String halves[] = part.split( " " );
+      fields.add( new FieldDef( halves[1], halves[0] ) );
+    }
+    return fields;
+  }
+
+  public Map<Integer,String> getOperators( ) {
+    Map<Integer,String> ops = new HashMap<>();
+    int n = getPlans( ).size();
+    Pattern p = Pattern.compile( "\\d+-(\\d+)\\s+(\\w+)" );
+    for ( int i = n-1; i >= 0;  i-- ) {
+      String plan = plans.get( i );
+      Matcher m = p.matcher( plan );
+      if ( ! m.find() ) { continue; }
+      int index = Integer.parseInt(m.group(1));
+      String op = m.group(2);
+      ops.put(index,op);
+    }
+    return ops;
+  }
+
+  public JsonArray getFragmentProfile( ) {
+    return profile.getJsonArray("fragmentProfile");
+  }
+
+  public static class OpInfo {
+    int opId;
+    int type;
+    String name;
+    long processMs;
+    long waitMs;
+    long setupMs;
+    long peakMem;
+    Map<Integer,JsonValue> metrics = new HashMap<>();
+
+    public long getMetric(int id) {
+      JsonValue value = metrics.get(id);
+      if (value == null) {
+        return 0; }
+      return ((JsonNumber) value).longValue();
+    }
+  }
+
+  public Map<Integer,OpInfo> getOpInfo( ) {
+    Map<Integer,String> ops = getOperators( );
+    Map<Integer,OpInfo> info = new HashMap<>( );
+    JsonArray frags = getFragmentProfile( );
+    JsonObject fragProfile = frags.getJsonObject(0).getJsonArray("minorFragmentProfile").getJsonObject(0);
+    JsonArray opList = fragProfile.getJsonArray("operatorProfile");
+    for ( JsonObject opProfile : opList.getValuesAs(JsonObject.class) ) {
+      parseOpProfile( ops, info, opProfile );
+    }
+    return info;
+  }
+
+  private void parseOpProfile(Map<Integer, String> ops,
+      Map<Integer, OpInfo> info, JsonObject opProfile) {
+    OpInfo opInfo = new OpInfo( );
+    opInfo.opId = opProfile.getInt("operatorId");
+    opInfo.type = opProfile.getInt("operatorType");
+    opInfo.name = ops.get(opInfo.opId);
+    opInfo.processMs = opProfile.getJsonNumber("processNanos").longValue() / 1_000_000;
+    opInfo.waitMs = opProfile.getJsonNumber("waitNanos").longValue() / 1_000_000;
+    opInfo.setupMs = opProfile.getJsonNumber("setupNanos").longValue() / 1_000_000;
+    opInfo.peakMem = opProfile.getJsonNumber("peakLocalMemoryAllocated").longValue() / (1024 * 1024);
+    JsonArray array = opProfile.getJsonArray("metric");
+    if (array != null) {
+      for (int i = 0; i < array.size(); i++) {
+        JsonObject metric = array.getJsonObject(i);
+        opInfo.metrics.put(metric.getJsonNumber("metricId").intValue(), metric.get("longValue"));
+      }
+    }
+    info.put(opInfo.opId, opInfo);
+  }
+
+  public void print() {
+    Map<Integer, OpInfo> opInfo = getOpInfo();
+    int n = opInfo.size();
+    long totalSetup = 0;
+    long totalProcess = 0;
+    for ( int i = 0;  i <= n;  i++ ) {
+      OpInfo op = opInfo.get(i);
+      if ( op == null ) { continue; }
+      totalSetup += op.setupMs;
+      totalProcess += op.processMs;
+    }
+    long total = totalSetup + totalProcess;
+    for ( int i = 0;  i <= n;  i++ ) {
+      OpInfo op = opInfo.get(i);
+      if ( op == null ) { continue; }
+      System.out.print( "Op: " );
+      System.out.print( op.opId );
+      System.out.println( " " + op.name );
+      System.out.print( "  Setup:   " + op.setupMs );
+      System.out.print( " - " + percent(op.setupMs, totalSetup ) + "%" );
+      System.out.println( ", " + percent(op.setupMs, total ) + "%" );
+      System.out.print( "  Process: " + op.processMs );
+      System.out.print( " - " + percent(op.processMs, totalProcess ) + "%" );
+      System.out.println( ", " + percent(op.processMs, total ) + "%" );
+      if (op.type == 17) {
+        long value = op.getMetric(0);
+        System.out.println( "  Spills: " + value );
+      }
+      if (op.waitMs > 0) {
+        System.out.println( "  Wait:    " + op.waitMs );
+      }
+      if ( op.peakMem > 0) {
+        System.out.println( "  Memory: " + op.peakMem );
+      }
+    }
+    System.out.println( "Total:" );
+    System.out.println( "  Setup:   " + totalSetup );
+    System.out.println( "  Process: " + totalProcess );
+  }
+
+  public static long percent( long value, long total ) {
+    if ( total == 0 ) {
+      return 0; }
+    return Math.round(value * 100 / total );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
new file mode 100644
index 0000000..084c28a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.client.PrintingResultsListener;
+import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.AwaitableUserResultsListener;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.util.VectorUtil;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.BufferingQueryEventListener.QueryEvent;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Builder for a Drill query. Provides all types of query formats,
+ * and a variety of ways to run the query.
+ */
+
+public class QueryBuilder {
+
+  /**
+   * Listener used to retrieve the query summary (only) asynchronously
+   * using a {@link QuerySummaryFuture}.
+   */
+
+  public class SummaryOnlyQueryEventListener implements UserResultsListener {
+
+    private final QuerySummaryFuture future;
+    private QueryId queryId;
+    private int recordCount;
+    private int batchCount;
+    private long startTime;
+
+    public SummaryOnlyQueryEventListener(QuerySummaryFuture future) {
+      this.future = future;
+      startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void queryIdArrived(QueryId queryId) {
+      this.queryId = queryId;
+    }
+
+    @Override
+    public void submissionFailed(UserException ex) {
+      future.completed(
+          new QuerySummary(queryId, recordCount, batchCount,
+                           System.currentTimeMillis() - startTime, ex));
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      batchCount++;
+      recordCount += result.getHeader().getRowCount();
+      result.release();
+    }
+
+    @Override
+    public void queryCompleted(QueryState state) {
+      future.completed(
+          new QuerySummary(queryId, recordCount, batchCount,
+                           System.currentTimeMillis() - startTime, state));
+    }
+  }
+
+  /**
+   * The future used to wait for the completion of an async query. Returns
+   * just the summary of the query.
+   */
+
+  public class QuerySummaryFuture implements Future<QuerySummary> {
+
+    /**
+     * Synchronizes the listener thread and the test thread that
+     * launched the query.
+     */
+
+    private CountDownLatch lock = new CountDownLatch(1);
+    private QuerySummary summary;
+
+    /**
+     * Unsupported at present.
+     */
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Always returns false.
+     */
+
+    @Override
+    public boolean isCancelled() { return false; }
+
+    @Override
+    public boolean isDone() { return summary != null; }
+
+    @Override
+    public QuerySummary get() throws InterruptedException, ExecutionException {
+      lock.await();
+      return summary;
+    }
+
+    /**
+     * Not supported at present, just does a non-timeout get.
+     */
+
+    @Override
+    public QuerySummary get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return get();
+    }
+
+    protected void completed(QuerySummary querySummary) {
+      summary = querySummary;
+      lock.countDown();
+    }
+  }
+
+  /**
+   * Summary results of a query: records, batches, run time.
+   */
+
+  public static class QuerySummary {
+    private final QueryId queryId;
+    private final int records;
+    private final int batches;
+    private final long ms;
+    private final QueryState finalState;
+    private final Exception error;
+
+    public QuerySummary(QueryId queryId, int recordCount, int batchCount, long elapsed, QueryState state) {
+      this.queryId = queryId;
+      records = recordCount;
+      batches = batchCount;
+      ms = elapsed;
+      finalState = state;
+      error = null;
+    }
+
+    public QuerySummary(QueryId queryId, int recordCount, int batchCount, long elapsed, Exception ex) {
+      this.queryId = queryId;
+      records = recordCount;
+      batches = batchCount;
+      ms = elapsed;
+      finalState = null;
+      error = ex;
+    }
+
+    public boolean failed() { return error != null; }
+    public boolean succeeded() { return error == null; }
+    public long recordCount() { return records; }
+    public int batchCount() { return batches; }
+    public long runTimeMs() { return ms; }
+    public QueryId queryId() { return queryId; }
+    public String queryIdString() { return QueryIdHelper.getQueryId(queryId); }
+    public Exception error() { return error; }
+    public QueryState finalState() { return finalState; }
+  }
+
+  private final ClientFixture client;
+  private QueryType queryType;
+  private String queryText;
+
+  QueryBuilder(ClientFixture client) {
+    this.client = client;
+  }
+
+  public QueryBuilder query(QueryType type, String text) {
+    queryType = type;
+    queryText = text;
+    return this;
+  }
+
+  public QueryBuilder sql(String sql) {
+    return query(QueryType.SQL, sql);
+  }
+
+  public QueryBuilder sql(String query, Object... args) {
+    return sql(String.format(query, args));
+  }
+
+  public QueryBuilder physical(String plan) {
+    return query(QueryType.PHYSICAL, plan);
+  }
+
+  public QueryBuilder sqlResource(String resource) {
+    sql(ClusterFixture.loadResource(resource));
+    return this;
+  }
+
+  public QueryBuilder sqlResource(String resource, Object... args) {
+    sql(ClusterFixture.loadResource(resource), args);
+    return this;
+  }
+
+  public QueryBuilder physicalResource(String resource) {
+    physical(ClusterFixture.loadResource(resource));
+    return this;
+  }
+
+  /**
+   * Run the query returning just a summary of the results: record count,
+   * batch count and run time. Handy when doing performance tests when the
+   * validity of the results is verified in some other test.
+   *
+   * @return the query summary
+   * @throws Exception if anything goes wrong anywhere in the execution
+   */
+
+  public QuerySummary run() throws Exception {
+    return produceSummary(withEventListener());
+  }
+
+  /**
+   * Run the query and return a list of the result batches. Use
+   * if the batch count is small and you want to work with them.
+   * @return a list of batches resulting from the query
+   * @throws RpcException
+   */
+
+  public List<QueryDataBatch> results() throws RpcException {
+    Preconditions.checkNotNull(queryType, "Query not provided.");
+    Preconditions.checkNotNull(queryText, "Query not provided.");
+    return client.client().runQuery(queryType, queryText);
+  }
+
+  /**
+   * Run the query with the listener provided. Use when the result
+   * count will be large, or you don't need the results.
+   *
+   * @param listener the Drill listener
+   */
+
+  public void withListener(UserResultsListener listener) {
+    Preconditions.checkNotNull(queryType, "Query not provided.");
+    Preconditions.checkNotNull(queryText, "Query not provided.");
+    client.client().runQuery(queryType, queryText, listener);
+  }
+
+  /**
+   * Run the query, return an easy-to-use event listener to process
+   * the query results. Use when the result set is large. The listener
+   * allows the caller to iterate over results in the test thread.
+   * (The listener implements a producer-consumer model to hide the
+   * details of Drill listeners.)
+   *
+   * @return the query event listener
+   */
+
+  public BufferingQueryEventListener withEventListener() {
+    BufferingQueryEventListener listener = new BufferingQueryEventListener();
+    withListener(listener);
+    return listener;
+  }
+
+  public long printCsv() {
+    return print(Format.CSV);
+  }
+
+  public long print(Format format) {
+    return print(format,20);
+  }
+
+  public long print(Format format, int colWidth) {
+    return runAndWait(new PrintingResultsListener(client.cluster().config(), format, colWidth));
+  }
+
+  /**
+   * Run the query asynchronously, returning a future to be used
+   * to check for query completion, wait for completion, and obtain
+   * the result summary.
+   */
+
+  public QuerySummaryFuture futureSummary() {
+    QuerySummaryFuture future = new QuerySummaryFuture();
+    withListener(new SummaryOnlyQueryEventListener(future));
+    return future;
+  }
+
+  /**
+   * Run a query and optionally print the output in TSV format.
+   * Similar to {@link QueryTestUtil#test} with one query. Output is printed
+   * only if the tests are running as verbose.
+   *
+   * @return the number of rows returned
+   * @throws Exception if anything goes wrong with query execution
+   */
+  public long print() throws Exception {
+    DrillConfig config = client.cluster().config( );
+
+    // Note: verbose check disabled until that change is
+    // committed.
+
+    boolean verbose = ! config.getBoolean(QueryTestUtil.TEST_QUERY_PRINTING_SILENT) /* ||
+                      DrillTest.verbose() */;
+    if (verbose) {
+      return print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH);
+    } else {
+      return run().recordCount();
+    }
+  }
+
+  public long runAndWait(UserResultsListener listener) {
+    AwaitableUserResultsListener resultListener =
+        new AwaitableUserResultsListener(listener);
+    withListener(resultListener);
+    try {
+      return resultListener.await();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Submit an "EXPLAIN" statement, and return text form of the
+   * plan.
+   * @throws Exception if the query fails
+   */
+
+  public String explainText() throws Exception {
+    return explain(ClusterFixture.EXPLAIN_PLAN_TEXT);
+  }
+
+  /**
+   * Submit an "EXPLAIN" statement, and return the JSON form of the
+   * plan.
+   * @throws Exception if the query fails
+   */
+
+  public String explainJson() throws Exception {
+    return explain(ClusterFixture.EXPLAIN_PLAN_JSON);
+  }
+
+  public String explain(String format) throws Exception {
+    queryText = "EXPLAIN PLAN FOR " + queryText;
+    return queryPlan(format);
+  }
+
+  private QuerySummary produceSummary(BufferingQueryEventListener listener) throws Exception {
+    long start = System.currentTimeMillis();
+    int recordCount = 0;
+    int batchCount = 0;
+    QueryId queryId = null;
+    QueryState state = null;
+    loop:
+    for (;;) {
+      QueryEvent event = listener.get();
+      switch (event.type)
+      {
+      case BATCH:
+        batchCount++;
+        recordCount += event.batch.getHeader().getRowCount();
+        event.batch.release();
+        break;
+      case EOF:
+        state = event.state;
+        break loop;
+      case ERROR:
+        throw event.error;
+      case QUERY_ID:
+        queryId = event.queryId;
+        break;
+      default:
+        throw new IllegalStateException("Unexpected event: " + event.type);
+      }
+    }
+    long end = System.currentTimeMillis();
+    long elapsed = end - start;
+    return new QuerySummary(queryId, recordCount, batchCount, elapsed, state);
+  }
+
+  /**
+   * Submit an "EXPLAIN" statement, and return the column value which
+   * contains the plan's string.
+   * <p>
+   * Cribbed from {@link PlanTestBase#getPlanInString(String, String)}
+   * @throws Exception if anything goes wrogn in the query
+   */
+
+  protected String queryPlan(String columnName) throws Exception {
+    Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explan an SQL query.");
+    final List<QueryDataBatch> results = results();
+    final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+    final StringBuilder builder = new StringBuilder();
+
+    for (final QueryDataBatch b : results) {
+      if (!b.hasData()) {
+        continue;
+      }
+
+      loader.load(b.getHeader().getDef(), b.getData());
+
+      final VectorWrapper<?> vw;
+      try {
+          vw = loader.getValueAccessorById(
+              NullableVarCharVector.class,
+              loader.getValueVectorId(SchemaPath.getSimplePath(columnName)).getFieldIds());
+      } catch (Throwable t) {
+        throw new IllegalStateException("Looks like you did not provide an explain plan query, please add EXPLAIN PLAN FOR to the beginning of your query.");
+      }
+
+      @SuppressWarnings("resource")
+      final ValueVector vv = vw.getValueVector();
+      for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
+        final Object o = vv.getAccessor().getObject(i);
+        builder.append(o);
+      }
+      loader.clear();
+      b.release();
+    }
+
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/package-info.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/package-info.java b/exec/java-exec/src/test/java/org/apache/drill/test/package-info.java
new file mode 100644
index 0000000..9f62478
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/package-info.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides a variety of test framework tools to simplify Drill unit
+ * tests and ad-hoc tests created while developing features. Key components
+ * include:
+ * <ul>
+ * <li>{@link FixtureBuilder}: Builder pattern to create an embedded Drillbit,
+ * or cluster of Drillbits, using a specified set of configuration, session
+ * and system options.</li>
+ * <li>{@link ClusterFixture}: The cluster created by the builder.</li>
+ * <li>{@link ClientFixture}: A facade to the Drill client that provides
+ * convenience methods for setting session options, running queries and
+ * so on. A client is associated with a cluster. If tests desire, multiple
+ * clients can be created for a single cluster, though most need just one
+ * client. A builder exists for clients, but most tests get the client
+ * directly from the cluster.</li>
+ * <li>{@link QueryBuilder}: a builder pattern for constructing and
+ * running any form of query (SQL, logical or physical) and running the
+ * query in a wide variety of ways (just count the rows, return the
+ * results as a list, run using a listener, etc.)</li>
+ * <li>{@link QueryBuilder.QuerySummary QuerySummary}: a summary of a
+ * query returned from running the query. Contains the query ID, the
+ * row count, the batch count and elapsed run time.</li>
+ * <li>{@link ProfileParser}: A simple tool to load a query profile and
+ * provide access to the profile structure. Also prints the key parts of
+ * the profile for diagnostic purposes.</li>
+ * <li>{@link LogFixture}: Allows per-test changes to log settings to,
+ * say, send a particular logger to the console for easier debugging, or
+ * to suppress logging of a deliberately created failure.</li>
+ * </ul>
+ * <h3>Usage</h3>
+ * A typical test using this framework looks like this:
+ * <code><pre>
+  {@literal @}Test
+  public void exampleTest() throws Exception {
+
+    // Configure the cluster. One Drillbit by default.
+    FixtureBuilder builder = ClusterFixture.builder()
+        // Set up per-test specialized config and session options.
+        .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+        .configProperty(ExecConstants.REMOVER_ENABLE_GENERIC_COPIER, true)
+        .sessionOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY, 3L * 1024 * 1024 * 1024)
+        .maxParallelization(1)
+        ;
+
+    // Launch the cluster and client.
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+
+      // Run a query (using the mock data source) and print a summary.
+      String sql = "SELECT id_i FROM `mock`.employee_1M ORDER BY id_i";
+      QuerySummary summary = client.queryBuilder().sql(sql).run();
+      assertEquals(1_000_000, summary.recordCount());
+      System.out.println(String.format("Sorted %,d records in %d batches.", summary.recordCount(), summary.batchCount()));
+      System.out.println(String.format("Query Id: %s, elapsed: %d ms", summary.queryIdString(), summary.runTimeMs()));
+      client.parseProfile(summary.queryIdString()).print();
+    }
+  }
+ * </pre></code>
+ * <p>
+ * Typical usage for the logging fixture: <pre><code>
+ * {@literal @}Test
+ * public void myTest() {
+ *   LogFixtureBuilder logBuilder = LogFixture.builder()
+ *          .toConsole()
+ *          .disable() // Silence all other loggers
+ *          .logger(ExternalSortBatch.class, Level.DEBUG);
+ *   try (LogFixture logs = logBuilder.build()) {
+ *     // Test code here
+ *   }
+ * }</code></pre>
+ *
+ */
+package org.apache.drill.test;


[4/4] drill git commit: DRILL-4764: Parquet file with INT_16, etc. logical types not supported by simple SELECT

Posted by su...@apache.org.
DRILL-4764: Parquet file with INT_16, etc. logical types not supported by simple SELECT

closes #673


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/60624af2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/60624af2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/60624af2

Branch: refs/heads/master
Commit: 60624af225f90992a15a707e1650e41ccecf5a53
Parents: 5c3924c
Author: Serhii-Harnyk <se...@gmail.com>
Authored: Thu Nov 24 13:24:03 2016 +0000
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Mon Jan 30 10:09:39 2017 -0800

----------------------------------------------------------------------
 .../columnreaders/ColumnReaderFactory.java      |  11 +++
 .../ParquetFixedWidthDictionaryReaders.java     |  72 +++++++++++++++++++
 .../ParquetToDrillTypeConverter.java            |   9 +++
 .../store/parquet2/TestDrillParquetReader.java  |  14 ++++
 .../test/resources/parquet/uint_types.parquet   | Bin 0 -> 1727 bytes
 5 files changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/60624af2/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 662d5c9..495f70b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -48,6 +48,8 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.TimeStampVector;
 import org.apache.drill.exec.vector.TimeVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.UInt8Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
@@ -121,6 +123,13 @@ public class ColumnReaderFactory {
                   return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal9Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal9Vector) v, schemaElement);
                 case TIME_MILLIS:
                   return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+                case INT_8:
+                case INT_16:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+                case UINT_8:
+                case UINT_16:
+                case UINT_32:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
                 default:
                   throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT32");
               }
@@ -129,6 +138,8 @@ public class ColumnReaderFactory {
                 return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
               }
               switch (convertedType) {
+                case UINT_64:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
                 case DECIMAL:
                   return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal18Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal18Vector) v, schemaElement);
                 case TIMESTAMP_MILLIS:

http://git-wip-us.apache.org/repos/asf/drill/blob/60624af2/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index d7b6fbb..53a68ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.TimeStampVector;
 import org.apache.drill.exec.vector.TimeVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.UInt8Vector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.format.SchemaElement;
@@ -56,6 +58,41 @@ public class ParquetFixedWidthDictionaryReaders {
     }
   }
 
+  /**
+   * This class uses for reading unsigned integer fields.
+   */
+  static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> {
+    DictionaryUInt4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                        ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v,
+                        SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      if (usingDictionary) {
+        UInt4Vector.Mutator mutator = valueVec.getMutator();
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
+          mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+        }
+        // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
+        // and we will go into the else condition below. The readField method of the parent class requires the
+        // writer index to be set correctly.
+        readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+        readLength = (int) Math.ceil(readLengthInBits / 8.0);
+        int writerIndex = valueVec.getBuffer().writerIndex();
+        valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+      } else {
+        super.readField(recordsToReadInThisPass);
+      }
+    }
+  }
+
   static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> {
     DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
                         ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
@@ -174,6 +211,41 @@ public class ParquetFixedWidthDictionaryReaders {
     }
   }
 
+  /**
+   * This class uses for reading unsigned BigInt fields.
+   */
+  static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> {
+    DictionaryUInt8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                           ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v,
+                           SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      if (usingDictionary) {
+        UInt8Vector.Mutator mutator = valueVec.getMutator();
+        for (int i = 0; i < recordsReadInThisIteration; i++) {
+          mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+        }
+        // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
+        // and we will go into the else condition below. The readField method of the parent class requires the
+        // writer index to be set correctly.
+        readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+        readLength = (int) Math.ceil(readLengthInBits / 8.0);
+        int writerIndex = valueVec.getBuffer().writerIndex();
+        valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+      } else {
+        super.readField(recordsToReadInThisPass);
+      }
+    }
+  }
+
   static class DictionaryDecimal18Reader extends FixedByteAlignedReader<Decimal18Vector> {
     DictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal18Vector v,

http://git-wip-us.apache.org/repos/asf/drill/blob/60624af2/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index be27f3e..3f5f3b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -61,6 +61,8 @@ public class ParquetToDrillTypeConverter {
           return (TypeProtos.MinorType.BIGINT);
         }
         switch(convertedType) {
+          case UINT_64:
+            return TypeProtos.MinorType.UINT8;
           case DECIMAL:
             ParquetReaderUtility.checkDecimalTypeEnabled(options);
             return TypeProtos.MinorType.DECIMAL18;
@@ -77,6 +79,13 @@ public class ParquetToDrillTypeConverter {
           return TypeProtos.MinorType.INT;
         }
         switch(convertedType) {
+          case UINT_8:
+          case UINT_16:
+          case UINT_32:
+            return TypeProtos.MinorType.UINT4;
+          case INT_8:
+          case INT_16:
+            return TypeProtos.MinorType.INT;
           case DECIMAL:
             ParquetReaderUtility.checkDecimalTypeEnabled(options);
             return TypeProtos.MinorType.DECIMAL9;

http://git-wip-us.apache.org/repos/asf/drill/blob/60624af2/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
index b18fd9d..477b825 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
@@ -84,4 +84,18 @@ public class TestDrillParquetReader extends BaseTestQuery {
       .sqlBaselineQuery("SELECT columns[0] id, CAST(NULLIF(columns[1], '') AS DOUBLE) val FROM cp.`parquet2/4349.csv.gz` WHERE columns[0] = 'b'")
       .go();
   }
+
+  @Test
+  public void testUnsignedAndSignedIntTypes() throws Exception {
+    testBuilder()
+      .unOrdered()
+      .sqlQuery("select * from cp.`parquet/uint_types.parquet`")
+      .baselineColumns("uint8_field", "uint16_field", "uint32_field", "uint64_field", "int8_field", "int16_field",
+        "required_uint8_field", "required_uint16_field", "required_uint32_field", "required_uint64_field",
+        "required_int8_field", "required_int16_field")
+      .baselineValues(255, 65535, 2147483647, 9223372036854775807L, 255, 65535, -1, -1, -1, -1L, -2147483648, -2147483648)
+      .baselineValues(-1, -1, -1, -1L, -2147483648, -2147483648, 255, 65535, 2147483647, 9223372036854775807L, 255, 65535)
+      .baselineValues(null, null, null, null, null, null, 0, 0, 0, 0L, 0, 0)
+      .go();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/60624af2/exec/java-exec/src/test/resources/parquet/uint_types.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/uint_types.parquet b/exec/java-exec/src/test/resources/parquet/uint_types.parquet
new file mode 100644
index 0000000..62ea0279
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/uint_types.parquet differ


[3/4] drill git commit: DRILL-5126: Provide simplified, unified "cluster fixture" for test

Posted by su...@apache.org.
DRILL-5126: Provide simplified, unified "cluster fixture" for test

Drill provides a robust selection of test frameworks that have evolved to satisfy the needs of a variety of test cases.
However, some do some of what a given test needs, while others to other parts. Also, the various frameworks make
assumptions (in the form of boot-time configuration) that differs from what some test may need, forcing the test
to start, then stop, then restart a Drillbit - an expensive operation.

Also, many ways exist to run queries, but they all do part of the job. Several ways exist to channge
runtime options.

This checkin shamelessly grabs the best parts from existing frameworks, adds a fluent builder facade
and provides a complete, versitie test framework for new tests. Old tests are unaffected by this
new code.

An adjustment was made to allow use of the existing TestBuilder mechanism. TestBuilder used to
depend on static members of BaseTestQuery. A "shim" allows the same code to work in the old
way for old tests, but with the new ClusterFixture for new tests.

Details are in the org.apache.drill.test.package-info.java file.

This commit modifies a single test case, TestSimpleExternalSort, to use the new framework.
More cases will follow once this framework itself is committed.

Also, the framework will eventually allow use of the extended mock data source
from SQL. However, that change must await checkin of the mock data source changes.

Includes a LogFixture that allows setting logger options per test to simplify debugging via tests.

Also includes a \u201csummary listener\u201d to run a query and return a summary of the
run. Handy to simply verify that a query runs and to time it.

Added an async query runner for tests that want to run multiple
concurrent queries.

closes #710


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5c3924c9
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5c3924c9
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5c3924c9

Branch: refs/heads/master
Commit: 5c3924c9844f7d25c0798c649bc032a0022b3a3e
Parents: 837722c
Author: Paul Rogers <pr...@maprtech.com>
Authored: Tue Dec 13 13:41:23 2016 -0800
Committer: Sudheesh Katkam <su...@apache.org>
Committed: Mon Jan 30 10:09:39 2017 -0800

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   6 +
 .../org/apache/drill/exec/server/Drillbit.java  |   2 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  23 +-
 .../java/org/apache/drill/DrillTestWrapper.java |  52 ++-
 .../test/java/org/apache/drill/TestBuilder.java |  88 ++--
 .../java/org/apache/drill/exec/ExecTest.java    |   2 +-
 .../impl/xsort/TestSimpleExternalSort.java      | 256 ++++-------
 .../drill/test/BufferingQueryEventListener.java | 112 +++++
 .../org/apache/drill/test/ClientFixture.java    | 195 ++++++++
 .../org/apache/drill/test/ClusterFixture.java   | 432 ++++++++++++++++++
 .../java/org/apache/drill/test/ClusterTest.java | 122 +++++
 .../java/org/apache/drill/test/FieldDef.java    |  82 ++++
 .../org/apache/drill/test/FixtureBuilder.java   | 260 +++++++++++
 .../java/org/apache/drill/test/LogFixture.java  | 255 +++++++++++
 .../org/apache/drill/test/ProfileParser.java    | 219 +++++++++
 .../org/apache/drill/test/QueryBuilder.java     | 455 +++++++++++++++++++
 .../org/apache/drill/test/package-info.java     |  90 ++++
 17 files changed, 2435 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 5992acb..000d447 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -458,6 +458,12 @@
       <artifactId>httpdlog-parser</artifactId>
       <version>2.4</version>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish</groupId>
+      <artifactId>javax.json</artifactId>
+      <version>1.0.4</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 25776ad..77532e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -106,7 +106,7 @@ public class Drillbit implements AutoCloseable {
       storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
     } else {
       coord = new ZKClusterCoordinator(config);
-      storeProvider = new PersistentStoreRegistry<ClusterCoordinator>(this.coord, config).newPStoreProvider();
+      storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider();
       isDistributedMode = true;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index fb84088..42cfe08 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.DrillTestWrapper.TestServices;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.scanner.ClassPathScanner;
@@ -249,8 +250,26 @@ public class BaseTestQuery extends ExecTest {
     return testBuilder();
   }
 
+
+  public static class ClassicTestServices implements TestServices {
+    @Override
+    public BufferAllocator allocator() {
+      return allocator;
+    }
+
+    @Override
+    public void test(String query) throws Exception {
+      BaseTestQuery.test(query);
+    }
+
+    @Override
+    public List<QueryDataBatch> testRunAndReturn(final QueryType type, final Object query) throws Exception {
+      return BaseTestQuery.testRunAndReturn(type, query);
+    }
+  }
+
   public static TestBuilder testBuilder() {
-    return new TestBuilder(allocator);
+    return new TestBuilder(new ClassicTestServices());
   }
 
   @AfterClass
@@ -308,7 +327,7 @@ public class BaseTestQuery extends ExecTest {
       Preconditions.checkArgument(query instanceof String, "Expected a string as input query");
       query = QueryTestUtil.normalizeQuery((String)query);
       return client.runQuery(type, (String)query);
-  }
+    }
   }
 
   public static List<QueryDataBatch> testPreparedStatement(PreparedStatementHandle handle) throws Exception {

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 7033be6..054676d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -64,6 +64,14 @@ import org.apache.drill.exec.vector.ValueVector;
 public class DrillTestWrapper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
+  public interface TestServices {
+    BufferAllocator allocator();
+
+    void test(String query) throws Exception;
+
+    List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception;
+  }
+
   // TODO - when in JSON, read baseline in all text mode to avoid precision loss for decimal values
 
   // This flag will enable all of the values that are validated to be logged. For large validations this is time consuming
@@ -91,7 +99,7 @@ public class DrillTestWrapper {
   private UserBitShared.QueryType baselineQueryType;
   // should ordering be enforced in the baseline check
   private boolean ordered;
-  private BufferAllocator allocator;
+  private TestServices services;
   // queries to run before the baseline or test queries, can be used to set options
   private String baselineOptionSettingQueries;
   private String testOptionSettingQueries;
@@ -108,12 +116,12 @@ public class DrillTestWrapper {
 
   private int expectedNumBatches;
 
-  public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, Object query, QueryType queryType,
+  public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
                           String baselineOptionSettingQueries, String testOptionSettingQueries,
                           QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
                           List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
     this.testBuilder = testBuilder;
-    this.allocator = allocator;
+    this.services = services;
     this.query = query;
     this.queryType = queryType;
     this.baselineQueryType = baselineQueryType;
@@ -138,7 +146,7 @@ public class DrillTestWrapper {
   }
 
   private BufferAllocator getAllocator() {
-    return allocator;
+    return services.allocator();
   }
 
   private void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
@@ -388,8 +396,8 @@ public class DrillTestWrapper {
     List<QueryDataBatch> actual;
     QueryDataBatch batch = null;
     try {
-      BaseTestQuery.test(testOptionSettingQueries);
-      actual = BaseTestQuery.testRunAndReturn(queryType, query);
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
       batch = actual.get(0);
       loader.load(batch.getHeader().getDef(), batch.getData());
 
@@ -438,8 +446,8 @@ public class DrillTestWrapper {
     List<Map<String, Object>> actualRecords = new ArrayList<>();
 
     try {
-      BaseTestQuery.test(testOptionSettingQueries);
-      actual = BaseTestQuery.testRunAndReturn(queryType, query);
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
 
       checkNumBatches(actual);
 
@@ -449,8 +457,8 @@ public class DrillTestWrapper {
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
-        BaseTestQuery.test(baselineOptionSettingQueries);
-        expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+        test(baselineOptionSettingQueries);
+        expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
         addToMaterializedResults(expectedRecords, expected, loader);
       } else {
         expectedRecords = baselineRecords;
@@ -481,7 +489,6 @@ public class DrillTestWrapper {
 
   public void compareMergedOnHeapVectors() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    BatchSchema schema = null;
 
     List<QueryDataBatch> actual = Collections.emptyList();
     List<QueryDataBatch> expected = Collections.emptyList();
@@ -489,8 +496,8 @@ public class DrillTestWrapper {
     Map<String, List<Object>> expectedSuperVectors;
 
     try {
-      BaseTestQuery.test(testOptionSettingQueries);
-      actual = BaseTestQuery.testRunAndReturn(queryType, query);
+      test(testOptionSettingQueries);
+      actual = testRunAndReturn(queryType, query);
 
       checkNumBatches(actual);
 
@@ -504,8 +511,8 @@ public class DrillTestWrapper {
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
-        BaseTestQuery.test(baselineOptionSettingQueries);
-        expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+        test(baselineOptionSettingQueries);
+        expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
         BatchIterator exBatchIter = new BatchIterator(expected, loader);
         expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
         exBatchIter.close();
@@ -539,8 +546,8 @@ public class DrillTestWrapper {
   public void compareResultsHyperVector() throws Exception {
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
 
-    BaseTestQuery.test(testOptionSettingQueries);
-    List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
+    test(testOptionSettingQueries);
+    List<QueryDataBatch> results = testRunAndReturn(queryType, query);
 
     checkNumBatches(results);
 
@@ -549,8 +556,8 @@ public class DrillTestWrapper {
 
     Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader);
 
-    BaseTestQuery.test(baselineOptionSettingQueries);
-    List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+    test(baselineOptionSettingQueries);
+    List<QueryDataBatch> expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
 
     Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader);
 
@@ -761,4 +768,11 @@ public class DrillTestWrapper {
     return ret + "\n";
   }
 
+  private void test(String query) throws Exception {
+    services.test(query);
+  }
+
+  private List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception {
+    return services.testRunAndReturn(type, query);
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index a19b30e..bef7b3b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -29,6 +29,7 @@ import org.antlr.runtime.ANTLRStringStream;
 import org.antlr.runtime.CommonTokenStream;
 import org.antlr.runtime.RecognitionException;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.DrillTestWrapper.TestServices;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.parser.ExprLexer;
 import org.apache.drill.common.expression.parser.ExprParser;
@@ -56,7 +57,7 @@ public class TestBuilder {
   // should the validation enforce ordering
   private Boolean ordered;
   private boolean approximateEquality;
-  private BufferAllocator allocator;
+  private TestServices services;
   // Used to pass the type information associated with particular column names rather than relying on the
   // ordering of the columns in the CSV file, or the default type inferences when reading JSON, this is used for the
   // case where results of the test query are adding type casts to the baseline queries, this saves a little bit of
@@ -84,16 +85,16 @@ public class TestBuilder {
 
   private int expectedNumBatches = DrillTestWrapper.EXPECTED_BATCH_COUNT_NOT_SET;
 
-  public TestBuilder(BufferAllocator allocator) {
-    this.allocator = allocator;
+  public TestBuilder(TestServices services) {
+    this.services = services;
     reset();
   }
 
-  public TestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
+  public TestBuilder(TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                      boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                      String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                      int expectedNumBatches) {
-    this(allocator);
+    this(services);
     if (ordered == null) {
       throw new RuntimeException("Ordering not set, when using a baseline file or query you must explicitly call the ordered() or unOrdered() method on the " + this.getClass().getSimpleName());
     }
@@ -123,7 +124,7 @@ public class TestBuilder {
     if ( ! ordered && highPerformanceComparison ) {
       throw new Exception("High performance comparison only available for ordered checks, to enforce this restriction, ordered() must be called first.");
     }
-    return new DrillTestWrapper(this, allocator, query, queryType, baselineOptionSettingQueries, testOptionSettingQueries,
+    return new DrillTestWrapper(this, services, query, queryType, baselineOptionSettingQueries, testOptionSettingQueries,
         getValidationQueryType(), ordered, highPerformanceComparison, baselineRecords, expectedNumBatches);
   }
 
@@ -154,24 +155,24 @@ public class TestBuilder {
   public TestBuilder sqlQueryFromFile(String queryFile) throws IOException {
     String query = BaseTestQuery.getFile(queryFile);
     this.query = query;
-    this.queryType = UserBitShared.QueryType.SQL;
+    queryType = UserBitShared.QueryType.SQL;
     return this;
   }
 
   public TestBuilder physicalPlanFromFile(String queryFile) throws IOException {
     String query = BaseTestQuery.getFile(queryFile);
     this.query = query;
-    this.queryType = UserBitShared.QueryType.PHYSICAL;
+    queryType = UserBitShared.QueryType.PHYSICAL;
     return this;
   }
 
   public TestBuilder ordered() {
-    this.ordered = true;
+    ordered = true;
     return this;
   }
 
   public TestBuilder unOrdered() {
-    this.ordered = false;
+    ordered = false;
     return this;
   }
 
@@ -179,36 +180,41 @@ public class TestBuilder {
   // a little harder to debug as it iterates over a hyper batch rather than reading all of the values into
   // large on-heap lists
   public TestBuilder highPerformanceComparison() throws Exception {
-    this.highPerformanceComparison = true;
+    highPerformanceComparison = true;
     return this;
   }
 
   // list of queries to run before the baseline query, can be used to set several options
   // list takes the form of a semi-colon separated list
   public TestBuilder optionSettingQueriesForBaseline(String queries) {
-    this.baselineOptionSettingQueries = queries;
+    baselineOptionSettingQueries = queries;
     return this;
   }
 
   public TestBuilder optionSettingQueriesForBaseline(String queries, Object... args) {
-    this.baselineOptionSettingQueries = String.format(queries, args);
+    baselineOptionSettingQueries = String.format(queries, args);
     return this;
   }
 
-  // list of queries to run before the test query, can be used to set several options
-  // list takes the form of a semi-colon separated list
+  /**
+   *  list of queries to run before the test query, can be used to set several options
+   *  list takes the form of a semi-colon separated list.
+   * @param queries queries that set session and system options
+   * @return this test builder
+   */
+
   public TestBuilder optionSettingQueriesForTestQuery(String queries) {
-    this.testOptionSettingQueries = queries;
+    testOptionSettingQueries = queries;
     return this;
   }
 
   public TestBuilder optionSettingQueriesForTestQuery(String query, Object... args) throws Exception {
-    this.testOptionSettingQueries = String.format(query, args);
+    testOptionSettingQueries = String.format(query, args);
     return this;
   }
 
   public TestBuilder approximateEquality() {
-    this.approximateEquality = true;
+    approximateEquality = true;
     return this;
   }
 
@@ -243,13 +249,13 @@ public class TestBuilder {
   }
 
   public JSONTestBuilder jsonBaselineFile(String filePath) {
-    return new JSONTestBuilder(filePath, allocator, query, queryType, ordered, approximateEquality,
+    return new JSONTestBuilder(filePath, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison,
         expectedNumBatches);
   }
 
   public CSVTestBuilder csvBaselineFile(String filePath) {
-    return new CSVTestBuilder(filePath, allocator, query, queryType, ordered, approximateEquality,
+    return new CSVTestBuilder(filePath, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison,
         expectedNumBatches);
   }
@@ -259,7 +265,7 @@ public class TestBuilder {
     assert baselineColumns == null : "The column information should be captured in expected schema, not baselineColumns";
 
     return new SchemaTestBuilder(
-        allocator,
+        services,
         query,
         queryType,
         baselineOptionSettingQueries,
@@ -280,7 +286,10 @@ public class TestBuilder {
     }
   }
 
-  // indicate that the tests query should be checked for an empty result set
+  /**
+   * Indicate that the tests query should be checked for an empty result set.
+   * @return the test builder
+   */
   public TestBuilder expectsEmptyResultSet() {
     unOrdered();
     baselineRecords = new ArrayList<>();
@@ -298,6 +307,7 @@ public class TestBuilder {
     this.expectedNumBatches = expectedNumBatches;
     return this;
   }
+
   /**
    * This method is used to pass in a simple list of values for a single record verification without
    * the need to create a CSV or JSON file to store the baseline.
@@ -306,7 +316,7 @@ public class TestBuilder {
    * checks.
    *
    * @param baselineValues - the baseline values to validate
-   * @return
+   * @return the test builder
    */
   public TestBuilder baselineValues(Object ... baselineValues) {
     assert getExpectedSchema() == null : "The expected schema is not needed when baselineValues are provided ";
@@ -339,7 +349,7 @@ public class TestBuilder {
    * with an assumed stable code path and produce the same erroneous result.
    *
    * @param materializedRecords - a list of maps representing materialized results
-   * @return
+   * @return the test builder
    */
   public TestBuilder baselineRecords(List<Map<String, Object>> materializedRecords) {
     this.baselineRecords = materializedRecords;
@@ -374,20 +384,24 @@ public class TestBuilder {
     return baselineRecords != null;
   }
 
-  // provide a SQL query to validate against
+  /**
+   * Provide a SQL query to validate against.
+   * @param baselineQuery
+   * @return the test builder
+   */
   public BaselineQueryTestBuilder sqlBaselineQuery(Object baselineQuery) {
-    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, allocator, query, queryType, ordered, approximateEquality,
+    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
 
   public BaselineQueryTestBuilder sqlBaselineQuery(String query, String ...replacements) {
-    return sqlBaselineQuery(String.format(query, replacements));
+    return sqlBaselineQuery(String.format(query, (Object[]) replacements));
   }
 
   // provide a path to a file containing a SQL query to use as a baseline
   public BaselineQueryTestBuilder sqlBaselineQueryFromFile(String baselineQueryFilename) throws IOException {
     String baselineQuery = BaseTestQuery.getFile(baselineQueryFilename);
-    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, allocator, query, queryType, ordered, approximateEquality,
+    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.SQL, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
 
@@ -395,7 +409,7 @@ public class TestBuilder {
   // that physical plans, or any large JSON strings do not live in the Java source as literals
   public BaselineQueryTestBuilder physicalPlanBaselineQueryFromFile(String baselinePhysicalPlanPath) throws IOException {
     String baselineQuery = BaseTestQuery.getFile(baselinePhysicalPlanPath);
-    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.PHYSICAL, allocator, query, queryType, ordered, approximateEquality,
+    return new BaselineQueryTestBuilder(baselineQuery, UserBitShared.QueryType.PHYSICAL, services, query, queryType, ordered, approximateEquality,
         baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries, highPerformanceComparison, expectedNumBatches);
   }
 
@@ -424,11 +438,11 @@ public class TestBuilder {
     // that come out of the test query drive interpretation of baseline
     private TypeProtos.MajorType[] baselineTypes;
 
-    CSVTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
+    CSVTestBuilder(String baselineFile, TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                    boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                    String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                    int expectedNumBatches) {
-      super(allocator, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
+      super(services, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
           highPerformanceComparison, expectedNumBatches);
       this.baselineFilePath = baselineFile;
     }
@@ -515,9 +529,9 @@ public class TestBuilder {
 
   public class SchemaTestBuilder extends TestBuilder {
     private List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema;
-    SchemaTestBuilder(BufferAllocator allocator, Object query, UserBitShared.QueryType queryType,
+    SchemaTestBuilder(TestServices services, Object query, UserBitShared.QueryType queryType,
         String baselineOptionSettingQueries, String testOptionSettingQueries, List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema) {
-      super(allocator, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
+      super(services, query, queryType, false, false, null, baselineOptionSettingQueries, testOptionSettingQueries, false, -1);
       expectsEmptyResultSet();
       this.expectedSchema = expectedSchema;
     }
@@ -556,11 +570,11 @@ public class TestBuilder {
     // path to the baseline file that will be inserted into the validation query
     private String baselineFilePath;
 
-    JSONTestBuilder(String baselineFile, BufferAllocator allocator, Object query, UserBitShared.QueryType queryType, Boolean ordered,
+    JSONTestBuilder(String baselineFile, TestServices services, Object query, UserBitShared.QueryType queryType, Boolean ordered,
                     boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                     String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                     int expectedNumBatches) {
-      super(allocator, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
+      super(services, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
           highPerformanceComparison, expectedNumBatches);
       this.baselineFilePath = baselineFile;
       this.baselineColumns = new String[] {"*"};
@@ -586,12 +600,12 @@ public class TestBuilder {
     private Object baselineQuery;
     private UserBitShared.QueryType baselineQueryType;
 
-    BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, BufferAllocator allocator,
+    BaselineQueryTestBuilder(Object baselineQuery, UserBitShared.QueryType baselineQueryType, TestServices services,
                              Object query, UserBitShared.QueryType queryType, Boolean ordered,
                              boolean approximateEquality, Map<SchemaPath, TypeProtos.MajorType> baselineTypeMap,
                              String baselineOptionSettingQueries, String testOptionSettingQueries, boolean highPerformanceComparison,
                              int expectedNumBatches) {
-      super(allocator, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
+      super(services, query, queryType, ordered, approximateEquality, baselineTypeMap, baselineOptionSettingQueries, testOptionSettingQueries,
           highPerformanceComparison, expectedNumBatches);
       this.baselineQuery = baselineQuery;
       this.baselineQueryType = baselineQueryType;

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index bfecf52..4872909 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index 85975cb..50bf710 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -22,192 +22,140 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
-import org.apache.drill.BaseTestQuery;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.common.util.TestTools;
-import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.DrillTest;
+import org.apache.drill.test.FixtureBuilder;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-public class TestSimpleExternalSort extends BaseTestQuery {
+public class TestSimpleExternalSort extends DrillTest {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
-  DrillConfig c = DrillConfig.create();
-
 
   @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
 
-  @Ignore
   @Test
-  public void mergeSortWithSv2() throws Exception {
-    List<QueryDataBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending_sv2.json");
-    int count = 0;
-    for(QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() != 0) {
-        count += b.getHeader().getRowCount();
-      }
-    }
-    assertEquals(500000, count);
-
-    long previousBigInt = Long.MAX_VALUE;
-
-    int recordCount = 0;
-    int batchCount = 0;
-
-    for (QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() == 0) {
-        break;
-      }
-      batchCount++;
-      RecordBatchLoader loader = new RecordBatchLoader(allocator);
-      loader.load(b.getHeader().getDef(),b.getData());
-      BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class,
-              loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
-      BigIntVector.Accessor a1 = c1.getAccessor();
+  public void mergeSortWithSv2Legacy() throws Exception {
+    mergeSortWithSv2(true);
+  }
 
-      for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
-        recordCount++;
-        assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
-        previousBigInt = a1.get(i);
-      }
-      loader.clear();
-      b.release();
+  /**
+   * Tests the external sort using an in-memory sort. Relies on default memory
+   * settings to be large enough to do the in-memory sort (there is,
+   * unfortunately, no way to double-check that no spilling was done.)
+   * This must be checked manually by setting a breakpoint in the in-memory
+   * sort routine.
+   *
+   * @param testLegacy
+   * @throws Exception
+   */
+
+  private void mergeSortWithSv2(boolean testLegacy) throws Exception {
+    try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+         ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client, testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending_sv2.json").results();
+      assertEquals(500000, client.countResults( results ));
+      validateResults(client.allocator(), results);
     }
+  }
 
-    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+  private void chooseImpl(ClientFixture client, boolean testLegacy) throws Exception {
   }
 
   @Test
-  public void sortOneKeyDescendingMergeSort() throws Throwable{
-    List<QueryDataBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending.json");
-    int count = 0;
-    for (QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() != 0) {
-        count += b.getHeader().getRowCount();
-      }
+  @Ignore
+  public void sortOneKeyDescendingMergeSortLegacy() throws Throwable {
+    sortOneKeyDescendingMergeSort(true);
+  }
+
+  private void sortOneKeyDescendingMergeSort(boolean testLegacy) throws Throwable {
+    try (ClusterFixture cluster = ClusterFixture.standardCluster( );
+         ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client, testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results();
+      assertEquals(1000000, client.countResults(results));
+      validateResults(client.allocator(), results);
     }
-    assertEquals(1000000, count);
+  }
 
+  private void validateResults(BufferAllocator allocator, List<QueryDataBatch> results) throws SchemaChangeException {
     long previousBigInt = Long.MAX_VALUE;
 
     int recordCount = 0;
     int batchCount = 0;
 
     for (QueryDataBatch b : results) {
-      if (b.getHeader().getRowCount() == 0) {
-        continue;
-      }
-      batchCount++;
       RecordBatchLoader loader = new RecordBatchLoader(allocator);
-      loader.load(b.getHeader().getDef(),b.getData());
-      BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
-      BigIntVector.Accessor a1 = c1.getAccessor();
-
-      for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
-        recordCount++;
-        assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
-        previousBigInt = a1.get(i);
-      }
-      loader.clear();
-      b.release();
-    }
-
-    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
-  }
-
-  @Test
-  @Ignore
-  public void sortOneKeyDescendingExternalSort() throws Throwable{
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-
-    DrillConfig config = DrillConfig.create("drill-external-sort.conf");
-
-    try (Drillbit bit1 = new Drillbit(config, serviceSet);
-        Drillbit bit2 = new Drillbit(config, serviceSet);
-        DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
-
-      bit1.run();
-      bit2.run();
-      client.connect();
-      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
-              Files.toString(FileUtils.getResourceAsFile("/xsort/one_key_sort_descending.json"),
-                      Charsets.UTF_8));
-      int count = 0;
-      for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() != 0) {
-          count += b.getHeader().getRowCount();
-        }
-      }
-      assertEquals(1000000, count);
-
-      long previousBigInt = Long.MAX_VALUE;
-
-      int recordCount = 0;
-      int batchCount = 0;
-
-      for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() == 0) {
-          break;
-        }
+      if (b.getHeader().getRowCount() > 0) {
         batchCount++;
-        RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
         loader.load(b.getHeader().getDef(),b.getData());
+        @SuppressWarnings("resource")
         BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
         BigIntVector.Accessor a1 = c1.getAccessor();
 
-        for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
+        for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
           recordCount++;
-          assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
+          assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
           previousBigInt = a1.get(i);
         }
-        loader.clear();
-        b.release();
       }
-      System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
-
+      loader.clear();
+      b.release();
     }
+
+    System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
   }
 
+
   @Test
   @Ignore
-  public void outOfMemoryExternalSort() throws Throwable{
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+  public void sortOneKeyDescendingExternalSortLegacy() throws Throwable {
+    sortOneKeyDescendingExternalSort(true);
+  }
 
-    DrillConfig config = DrillConfig.create("drill-oom-xsort.conf");
+  private void sortOneKeyDescendingExternalSort(boolean testLegacy) throws Throwable {
+    FixtureBuilder builder = ClusterFixture.builder( )
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4 )
+        .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4);
+    try (ClusterFixture cluster = builder.build();
+        ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client,testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/one_key_sort_descending.json").results();
+      assertEquals(1000000, client.countResults( results ));
+      validateResults(client.allocator(), results);
+    }
+  }
 
-    try (Drillbit bit1 = new Drillbit(config, serviceSet);
-        DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
+  @Ignore
+  @Test
+  public void outOfMemoryExternalSortLegacy() throws Throwable{
+    outOfMemoryExternalSort(true);
+  }
 
-      bit1.run();
-      client.connect();
-      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
-              Files.toString(FileUtils.getResourceAsFile("/xsort/oom_sort_test.json"),
-                      Charsets.UTF_8));
-      int count = 0;
-      for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() != 0) {
-          count += b.getHeader().getRowCount();
-        }
-      }
-      assertEquals(10000000, count);
+  private void outOfMemoryExternalSort(boolean testLegacy) throws Throwable{
+    FixtureBuilder builder = ClusterFixture.builder( )
+        // Probably do nothing in modern Drill
+        .configProperty( "drill.memory.fragment.max", 50000000 )
+        .configProperty( "drill.memory.fragment.initial", 2000000 )
+        .configProperty( "drill.memory.operator.max", 30000000 )
+        .configProperty( "drill.memory.operator.initial", 2000000 );
+    try (ClusterFixture cluster = builder.build();
+        ClientFixture client = cluster.clientFixture()) {
+      chooseImpl(client,testLegacy);
+      List<QueryDataBatch> results = client.queryBuilder().physicalResource("/xsort/oom_sort_test.json").results();
+      assertEquals(10000000, client.countResults( results ));
 
       long previousBigInt = Long.MAX_VALUE;
 
@@ -215,29 +163,25 @@ public class TestSimpleExternalSort extends BaseTestQuery {
       int batchCount = 0;
 
       for (QueryDataBatch b : results) {
-        if (b.getHeader().getRowCount() == 0) {
-          break;
+        RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+        if (b.getHeader().getRowCount() > 0) {
+          batchCount++;
+          loader.load(b.getHeader().getDef(),b.getData());
+          @SuppressWarnings("resource")
+          BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+          BigIntVector.Accessor a1 = c1.getAccessor();
+
+          for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+            recordCount++;
+            assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
+            previousBigInt = a1.get(i);
+          }
+          assertTrue(String.format("%d == %d", a1.get(0), a1.get(a1.getValueCount() - 1)), a1.get(0) != a1.get(a1.getValueCount() - 1));
         }
-        batchCount++;
-        RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
-        loader.load(b.getHeader().getDef(),b.getData());
-        BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
-
-
-        BigIntVector.Accessor a1 = c1.getAccessor();
-
-        for (int i =0; i < c1.getAccessor().getValueCount(); i++) {
-          recordCount++;
-          assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
-          previousBigInt = a1.get(i);
-        }
-        assertTrue(String.format("%d == %d", a1.get(0), a1.get(a1.getValueCount() - 1)), a1.get(0) != a1.get(a1.getValueCount() - 1));
         loader.clear();
         b.release();
       }
       System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
-
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
new file mode 100644
index 0000000..6d68757
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BufferingQueryEventListener.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.ConnectionThrottle;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+
+import com.google.common.collect.Queues;
+
+/**
+ * Drill query event listener that buffers rows into a producer-consumer
+ * queue. Allows rows to be received asynchronously, but processed by
+ * a synchronous reader.
+ * <p>
+ * Query messages are transformed into events: query ID, batch,
+ * EOF or error.
+ */
+
+public class BufferingQueryEventListener implements UserResultsListener
+{
+  public static class QueryEvent
+  {
+    public enum Type { QUERY_ID, BATCH, EOF, ERROR }
+
+    public final Type type;
+    public QueryId queryId;
+    public QueryDataBatch batch;
+    public Exception error;
+    public QueryState state;
+
+    public QueryEvent(QueryId queryId) {
+      this.queryId = queryId;
+      this.type = Type.QUERY_ID;
+    }
+
+    public QueryEvent(Exception ex) {
+      error = ex;
+      type = Type.ERROR;
+    }
+
+    public QueryEvent(QueryDataBatch batch) {
+      this.batch = batch;
+      type = Type.BATCH;
+    }
+
+    public QueryEvent(QueryState state) {
+      this.type = Type.EOF;
+      this.state = state;
+    }
+  }
+
+  private BlockingQueue<QueryEvent> queue = Queues.newLinkedBlockingQueue();
+
+  @Override
+  public void queryIdArrived(QueryId queryId) {
+    silentPut(new QueryEvent(queryId));
+  }
+
+  @Override
+  public void submissionFailed(UserException ex) {
+    silentPut(new QueryEvent(ex));
+  }
+
+  @Override
+  public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+    silentPut(new QueryEvent(result));
+  }
+
+  @Override
+  public void queryCompleted(QueryState state) {
+    silentPut(new QueryEvent(state));
+  }
+
+  private void silentPut(QueryEvent event) {
+    try {
+      queue.put(event);
+    } catch (InterruptedException e) {
+      // What to do, what to do...
+      e.printStackTrace();
+    }
+  }
+
+  public QueryEvent get() {
+    try {
+      return queue.take();
+    } catch (InterruptedException e) {
+      // Should not occur, but if it does, just pass along the error.
+      return new QueryEvent(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
new file mode 100644
index 0000000..be36dd7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.TestBuilder;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.test.ClusterFixture.FixtureTestServices;
+import org.apache.drill.test.QueryBuilder.QuerySummary;
+
+public class ClientFixture implements AutoCloseable {
+
+  public static class ClientBuilder {
+
+    ClusterFixture cluster;
+    Properties clientProps;
+
+    protected ClientBuilder(ClusterFixture cluster) {
+      this.cluster = cluster;
+    }
+    /**
+     * Specify an optional client property.
+     * @param key property name
+     * @param value property value
+     * @return this builder
+     */
+    public ClientBuilder property( String key, Object value ) {
+      if ( clientProps == null ) {
+        clientProps = new Properties( );
+      }
+      clientProps.put(key, value);
+      return this;
+    }
+
+    ClientFixture build( ) {
+      try {
+        return new ClientFixture(this);
+      } catch (RpcException e) {
+
+        // When used in a test with an embedded Drillbit, the
+        // RPC exception should not occur.
+
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  private ClusterFixture cluster;
+  private DrillClient client;
+
+  public ClientFixture(ClientBuilder builder) throws RpcException {
+    this.cluster = builder.cluster;
+
+    // Create a client.
+
+    client = new DrillClient(cluster.config( ), cluster.serviceSet( ).getCoordinator());
+    client.connect(builder.clientProps);
+    cluster.clients.add(this);
+  }
+
+  public DrillClient client() { return client; }
+  public ClusterFixture cluster( ) { return cluster; }
+  public BufferAllocator allocator( ) { return cluster.allocator( ); }
+
+  /**
+   * Set a runtime option.
+   *
+   * @param key
+   * @param value
+   * @throws RpcException
+   */
+
+  public void alterSession(String key, Object value ) throws Exception {
+    String sql = "ALTER SESSION SET `" + key + "` = " + ClusterFixture.stringify( value );
+    runSqlSilently( sql );
+  }
+
+  public void alterSystem(String key, Object value ) throws Exception {
+    String sql = "ALTER SYSTEM SET `" + key + "` = " + ClusterFixture.stringify( value );
+    runSqlSilently( sql );
+  }
+
+  /**
+   * Run SQL silently (discard results.)
+   *
+   * @param sql
+   * @throws RpcException
+   */
+
+  public void runSqlSilently(String sql) throws Exception {
+    queryBuilder().sql(sql).run();
+  }
+
+  public QueryBuilder queryBuilder() {
+    return new QueryBuilder(this);
+  }
+
+  public int countResults(List<QueryDataBatch> results) {
+    int count = 0;
+    for(QueryDataBatch b : results) {
+      count += b.getHeader().getRowCount();
+    }
+    return count;
+  }
+
+  public TestBuilder testBuilder() {
+    return new TestBuilder(new FixtureTestServices(this));
+  }
+
+  /**
+   * Run zero or more queries and optionally print the output in TSV format.
+   * Similar to {@link QueryTestUtil#test}. Output is printed
+   * only if the tests are running as verbose.
+   *
+   * @return the number of rows returned
+   */
+
+  public void runQueries(final String queryString) throws Exception{
+    final String query = QueryTestUtil.normalizeQuery(queryString);
+    String[] queries = query.split(";");
+    for (String q : queries) {
+      final String trimmedQuery = q.trim();
+      if (trimmedQuery.isEmpty()) {
+        continue;
+      }
+      queryBuilder( ).sql(trimmedQuery).print();
+    }
+  }
+
+  @Override
+  public void close( ) {
+    if (client == null) {
+      return;
+    }
+    try {
+      client.close( );
+    } finally {
+      client = null;
+      cluster.clients.remove(this);
+    }
+  }
+
+  /**
+   * Return a parsed query profile for a query summary. Saving of profiles
+   * must be turned on.
+   *
+   * @param summary
+   * @return
+   * @throws IOException
+   */
+
+  public ProfileParser parseProfile(QuerySummary summary) throws IOException {
+    return parseProfile(summary.queryIdString());
+  }
+
+  /**
+   * Parse a query profile from the local storage location given the
+   * query ID. Saving of profiles must be turned on. This is a bit of
+   * a hack: the profile should be available directly from the server.
+   * @throws IOException
+   */
+
+  public ProfileParser parseProfile(String queryId) throws IOException {
+    String tmpDir = cluster().config().getString(ExecConstants.DRILL_TMP_DIR);
+    File drillTmp = new File(new File(tmpDir), "drill");
+    File profileDir = new File(drillTmp, "profiles" );
+    File file = new File( profileDir, queryId + ".sys.drill" );
+    return new ProfileParser(file);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
new file mode 100644
index 0000000..f89eb01
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.DrillTestWrapper.TestServices;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistryImpl;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.mock.MockStorageEngineConfig;
+import org.apache.drill.exec.util.TestUtilities;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+
+/**
+ * Test fixture to start a Drillbit with provide options, create a client, and
+ * execute queries. Can be used in JUnit tests, or in ad-hoc programs. Provides
+ * a builder to set the necessary embedded Drillbit and client options, then
+ * creates the requested Drillbit and client.
+ */
+
+public class ClusterFixture implements AutoCloseable {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClientFixture.class);
+  public static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
+  public static final int MAX_WIDTH_PER_NODE = 2;
+
+  @SuppressWarnings("serial")
+  public static final Properties TEST_CONFIGURATIONS = new Properties() {
+    {
+      // Properties here mimic those in drill-root/pom.xml, Surefire plugin
+      // configuration. They allow tests to run successfully in Eclipse.
+
+      put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false);
+      put(ExecConstants.HTTP_ENABLE, false);
+      put(QueryTestUtil.TEST_QUERY_PRINTING_SILENT, true);
+      put("drill.catastrophic_to_standard_out", true);
+
+      // Verbose errors.
+
+      put(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY, true);
+
+      // See Drillbit.close. The Drillbit normally waits a specified amount
+      // of time for ZK registration to drop. But, embedded Drillbits normally
+      // don't use ZK, so no need to wait.
+
+      put(ExecConstants.ZK_REFRESH, 0);
+
+      // This is just a test, no need to be heavy-duty on threads.
+      // This is the number of server and client RPC threads. The
+      // production default is DEFAULT_SERVER_RPC_THREADS.
+
+      put(ExecConstants.BIT_SERVER_RPC_THREADS, 2);
+
+      // No need for many scanners except when explicitly testing that
+      // behavior. Production default is DEFAULT_SCAN_THREADS
+
+      put(ExecConstants.SCAN_THREADPOOL_SIZE, 4);
+    }
+  };
+
+  public static final String DEFAULT_BIT_NAME = "drillbit";
+
+  private DrillConfig config;
+  private Map<String,Drillbit> bits = new HashMap<>();
+  private Drillbit defaultDrillbit;
+  private BufferAllocator allocator;
+  private boolean ownsZK;
+  private ZookeeperHelper zkHelper;
+  private RemoteServiceSet serviceSet;
+  private String dfsTestTmpSchemaLocation;
+  protected List<ClientFixture> clients = new ArrayList<>();
+
+  protected ClusterFixture(FixtureBuilder  builder) throws Exception {
+
+    // Start ZK if requested.
+
+    if (builder.zkHelper != null) {
+      zkHelper = builder.zkHelper;
+      ownsZK = false;
+    } else if (builder.zkCount > 0) {
+      zkHelper = new ZookeeperHelper(true);
+      zkHelper.startZookeeper(builder.zkCount);
+      ownsZK = true;
+    }
+
+    // Create a config
+    // Because of the way DrillConfig works, we can set the ZK
+    // connection string only if a property set is provided.
+
+    if (builder.configResource != null) {
+      config = DrillConfig.create(builder.configResource);
+    } else if (builder.configProps != null) {
+      config = DrillConfig.create(configProperties(builder.configProps));
+    } else {
+      config = DrillConfig.create(configProperties(TEST_CONFIGURATIONS));
+    }
+
+    // Not quite sure what this is, but some tests seem to use it.
+
+    if (builder.enableFullCache ||
+        (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE))) {
+      serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator);
+    } else {
+      serviceSet = RemoteServiceSet.getLocalServiceSet();
+    }
+
+    dfsTestTmpSchemaLocation = TestUtilities.createTempDir();
+
+    Preconditions.checkArgument(builder.bitCount > 0);
+    int bitCount = builder.bitCount;
+    for (int i = 0; i < bitCount; i++) {
+      @SuppressWarnings("resource")
+      Drillbit bit = new Drillbit(config, serviceSet);
+      bit.run();
+
+      // Create the dfs_test name space
+
+      @SuppressWarnings("resource")
+      final StoragePluginRegistry pluginRegistry = bit.getContext().getStorage();
+      TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
+      TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
+
+      // Create the mock data plugin
+      // (Disabled until DRILL-5152 is committed.)
+
+      MockStorageEngineConfig config = MockStorageEngineConfig.INSTANCE;
+      @SuppressWarnings("resource")
+      MockStorageEngine plugin = new MockStorageEngine(
+          MockStorageEngineConfig.INSTANCE, bit.getContext(),
+          MockStorageEngineConfig.NAME);
+      ((StoragePluginRegistryImpl) pluginRegistry)
+          .definePlugin(MockStorageEngineConfig.NAME, config, plugin);
+
+      // Bit name and registration.
+
+      String name;
+      if (builder.bitNames != null && i < builder.bitNames.length) {
+        name = builder.bitNames[i];
+      } else {
+
+        // Name the Drillbit by default. Most tests use one Drillbit,
+        // so make the name simple: "drillbit." Only add a numeric suffix
+        // when the test creates multiple bits.
+
+        if (bitCount == 1) {
+          name = DEFAULT_BIT_NAME;
+        } else {
+          name = DEFAULT_BIT_NAME + Integer.toString(i+1);
+        }
+      }
+      bits.put(name, bit);
+
+      // Remember the first Drillbit, this is the default one returned from
+      // drillbit().
+
+      if (i == 0) {
+        defaultDrillbit = bit;
+      }
+    }
+
+    // Some operations need an allocator.
+
+    allocator = RootAllocatorFactory.newRoot(config);
+
+    // Apply system options
+
+    if (builder.systemOptions != null) {
+      for (FixtureBuilder.RuntimeOption option : builder.systemOptions) {
+        clientFixture().alterSystem(option.key, option.value);
+      }
+    }
+
+    // Apply session options.
+
+    if (builder.sessionOptions != null) {
+      for (FixtureBuilder.RuntimeOption option : builder.sessionOptions) {
+        clientFixture().alterSession(option.key, option.value);
+      }
+    }
+  }
+
+  private Properties configProperties(Properties configProps) {
+    Properties effectiveProps = new Properties();
+    for (Entry<Object, Object> entry : configProps.entrySet()) {
+      effectiveProps.put(entry.getKey(), entry.getValue().toString());
+    }
+    if (zkHelper != null) {
+      effectiveProps.put(ExecConstants.ZK_CONNECTION, zkHelper.getConfig().getString(ExecConstants.ZK_CONNECTION));
+    }
+    return effectiveProps;
+  }
+
+  public Drillbit drillbit() { return defaultDrillbit; }
+  public Drillbit drillbit(String name) { return bits.get(name); }
+  public Collection<Drillbit> drillbits() { return bits.values(); }
+  public RemoteServiceSet serviceSet() { return serviceSet; }
+  public BufferAllocator allocator() { return allocator; }
+  public DrillConfig config() { return config; }
+
+  public ClientFixture.ClientBuilder clientBuilder() {
+    return new ClientFixture.ClientBuilder(this);
+  }
+
+  public ClientFixture clientFixture() {
+    if (clients.isEmpty()) {
+      clientBuilder().build();
+    }
+    return clients.get(0);
+  }
+
+  public DrillClient client() {
+    return clientFixture().client();
+  }
+
+  /**
+   * Close the clients, drillbits, allocator and
+   * Zookeeper. Checks for exceptions. If an exception occurs,
+   * continues closing, suppresses subsequent exceptions, and
+   * throws the first exception at completion of close. This allows
+   * the test code to detect any state corruption which only shows
+   * itself when shutting down resources (memory leaks, for example.)
+   */
+
+  @Override
+  public void close() throws Exception {
+    Exception ex = null;
+
+    // Close clients. Clients remove themselves from the client
+    // list.
+
+    while (!clients.isEmpty()) {
+      ex = safeClose(clients.get(0), ex);
+    }
+
+    for (Drillbit bit : drillbits()) {
+      ex = safeClose(bit, ex);
+    }
+    bits.clear();
+    ex = safeClose(serviceSet, ex);
+    serviceSet = null;
+    ex = safeClose(allocator, ex);
+    allocator = null;
+    if (zkHelper != null && ownsZK) {
+      try {
+        zkHelper.stopZookeeper();
+      } catch (Exception e) {
+        ex = ex == null ? e : ex;
+      }
+    }
+    zkHelper = null;
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  private Exception safeClose(AutoCloseable item, Exception ex) {
+    try {
+      if (item != null) {
+        item.close();
+      }
+    } catch (Exception e) {
+      ex = ex == null ? e : ex;
+    }
+    return ex;
+  }
+
+  public void defineWorkspace(String pluginName, String schemaName, String path,
+      String defaultFormat) throws ExecutionSetupException {
+    for (Drillbit bit : drillbits()) {
+      defineWorkspace(bit, pluginName, schemaName, path, defaultFormat);
+    }
+  }
+
+  public static void defineWorkspace(Drillbit drillbit, String pluginName,
+      String schemaName, String path, String defaultFormat)
+      throws ExecutionSetupException {
+    @SuppressWarnings("resource")
+    final StoragePluginRegistry pluginRegistry = drillbit.getContext().getStorage();
+    @SuppressWarnings("resource")
+    final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(pluginName);
+    final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
+    final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(path, true, defaultFormat);
+
+    pluginConfig.workspaces.remove(schemaName);
+    pluginConfig.workspaces.put(schemaName, newTmpWSConfig);
+
+    pluginRegistry.createOrUpdate(pluginName, pluginConfig, true);
+  }
+
+  public static final String EXPLAIN_PLAN_TEXT = "text";
+  public static final String EXPLAIN_PLAN_JSON = "json";
+
+  public static FixtureBuilder builder() {
+     return new FixtureBuilder()
+         .configProps(FixtureBuilder.defaultProps())
+         .sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, MAX_WIDTH_PER_NODE)
+         ;
+  }
+
+  public static FixtureBuilder bareBuilder() {
+    return new FixtureBuilder();
+  }
+
+  public static class FixtureTestServices implements TestServices {
+
+    private ClientFixture client;
+
+    public FixtureTestServices(ClientFixture client) {
+      this.client = client;
+    }
+
+    @Override
+    public BufferAllocator allocator() {
+      return client.allocator();
+    }
+
+    @Override
+    public void test(String query) throws Exception {
+      client.runQueries(query);
+    }
+
+    @Override
+    public List<QueryDataBatch> testRunAndReturn(QueryType type, Object query)
+        throws Exception {
+      return client.queryBuilder().query(type, (String) query).results();
+    }
+  }
+
+  public static ClusterFixture standardCluster() throws Exception {
+    return builder().build();
+  }
+
+  static String stringify(Object value) {
+    if (value instanceof String) {
+      return "'" + (String) value + "'";
+    } else {
+      return value.toString();
+    }
+  }
+
+  public static String getResource(String resource) throws IOException {
+    // Unlike the Java routines, Guava does not like a leading slash.
+
+    final URL url = Resources.getResource(trimSlash(resource));
+    if (url == null) {
+      throw new IOException(String.format("Unable to find resource %s.", resource));
+    }
+    return Resources.toString(url, Charsets.UTF_8);
+  }
+
+  public static String loadResource(String resource) {
+    try {
+      return getResource(resource);
+    } catch (IOException e) {
+      throw new IllegalStateException("Resource not found: " + resource, e);
+    }
+  }
+
+  static String trimSlash(String path) {
+    if (path == null) {
+      return path;
+    } else if (path.startsWith("/")) {
+      return path.substring(1);
+    } else {
+      return path;
+    }
+  }
+
+  /**
+   * Create a temp directory to store the given <i>dirName</i>.
+   * Directory will be deleted on exit. Directory is created if it does
+   * not exist.
+   * @param dirName directory name
+   * @return Full path including temp parent directory and given directory name.
+   */
+  public static File getTempDir(final String dirName) {
+    final File dir = Files.createTempDir();
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        FileUtils.deleteQuietly(dir);
+      }
+    });
+    File tempDir = new File(dir, dirName);
+    tempDir.mkdirs();
+    return tempDir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
new file mode 100644
index 0000000..62beedd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.\u2030
+ */
+package org.apache.drill.test;
+
+import java.io.IOException;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+
+/**
+ * Base class for tests that use a single cluster fixture for a set of
+ * tests. Extend your test case directly from {@link DrillTest} if you
+ * need to start up and shut down a cluster multiple times.
+ * <p>
+ * To create a test with a single cluster config, do the following:
+ * <pre><code>
+ * public class YourTest extends ClusterTest {
+ *   {@literal @}BeforeClass
+ *   public static setup( ) throws Exception {
+ *     FixtureBuilder builder = ClusterFixture.builder()
+ *       // Set options, etc.
+ *       ;
+ *     startCluster(builder);
+ *   }
+ *
+ *   // Your tests
+ * }
+ * </code></pre>
+ * This class takes care of shutting down the cluster at the end of the test.
+ * <p>
+ * The simplest possible setup:
+ * <pre><code>
+ *   {@literal @}BeforeClass
+ *   public static setup( ) throws Exception {
+ *     startCluster(ClusterFixture.builder( ));
+ *   }
+ * </code></pre>
+ * <p>
+ * If you need to start the cluster with different (boot time) configurations,
+ * do the following instead:
+ * <pre><code>
+ * public class YourTest extends DrillTest {
+ *   {@literal @}Test
+ *   public someTest() throws Exception {
+ *     FixtureBuilder builder = ClusterFixture.builder()
+ *       // Set options, etc.
+ *       ;
+ *     try(ClusterFixture cluster = builder.build) {
+ *       // Tests here
+ *     }
+ *   }
+ * }
+ * </code></pre>
+ * The try-with-resources block ensures that the cluster is shut down at
+ * the end of each test method.
+ */
+
+public class ClusterTest extends DrillTest {
+
+  protected static ClusterFixture cluster;
+  protected static ClientFixture client;
+
+  protected static void startCluster(FixtureBuilder builder) throws Exception {
+    cluster = builder.build();
+    client = cluster.clientFixture();
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    AutoCloseables.close(client, cluster);
+  }
+
+  /**
+   * Convenience method when converting classic tests to use the
+   * cluster fixture.
+   * @return a test builder that works against the cluster fixture
+   */
+
+  public TestBuilder testBuilder() {
+    return client.testBuilder();
+  }
+
+  /**
+   * Convenience method when converting classic tests to use the
+   * cluster fixture.
+   * @return the contents of the resource text file
+   */
+
+  public String getFile(String resource) throws IOException {
+    return ClusterFixture.getResource(resource);
+  }
+
+  public void test(String sqlQuery) throws Exception {
+    client.runQueries(sqlQuery);
+  }
+
+  public static void test(String query, Object... args) throws Exception {
+    client.queryBuilder().sql(query, args).run( );
+  }
+
+  public QueryBuilder queryBuilder( ) {
+    return client.queryBuilder();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java b/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java
new file mode 100644
index 0000000..3812217
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/FieldDef.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.test;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Basic representation of a column parsed from a query profile.
+ * Idea is to use this to generate mock data that represents a
+ * query obtained from a user. This is a work in progress.
+ */
+
+public class FieldDef {
+  public enum Type { VARCHAR, DOUBLE };
+  public enum TypeHint { DATE, TIME };
+
+  public final String name;
+  public final String typeStr;
+  public final Type type;
+  public int length;
+  public TypeHint hint;
+
+  public FieldDef(String name, String typeStr) {
+    this.name = name;
+    this.typeStr = typeStr;
+
+    // Matches the type as provided in the query profile:
+    // name:type(length)
+    // Length is provided for VARCHAR fields. Examples:
+    // count: INTEGER
+    // customerName: VARCHAR(50)
+
+    Pattern p = Pattern.compile("(\\w+)(?:\\((\\d+)\\))?");
+    Matcher m = p.matcher(typeStr);
+    if (! m.matches()) { throw new IllegalStateException(); }
+    if (m.group(2) == null) {
+      length = 0;
+    } else {
+      length = Integer.parseInt(m.group(2));
+    }
+    switch (m.group(1).toUpperCase()) {
+    case "VARCHAR":
+      type = Type.VARCHAR;
+      break;
+    case "DOUBLE":
+      type = Type.DOUBLE;
+      break;
+    // TODO: Add other types over time.
+    default:
+      type = null;
+    }
+
+  }
+
+  @Override
+  public String toString() {
+    String str = name + ": " + typeStr;
+    if (type != null) {
+      str += " - " + type.name();
+      if (length != 0) {
+        str += "(" + length + ")";
+      }
+    }
+    return str;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5c3924c9/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
new file mode 100644
index 0000000..e56f190
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/FixtureBuilder.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ZookeeperHelper;
+
+/**
+ * Build a Drillbit and client with the options provided. The simplest
+ * builder starts an embedded Drillbit, with the "dfs_test" name space,
+ * a max width (parallelization) of 2.
+ */
+
+public class FixtureBuilder {
+
+  public static class RuntimeOption {
+    public String key;
+    public Object value;
+
+    public RuntimeOption(String key, Object value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  // Values in the drill-module.conf file for values that are customized
+  // in the defaults.
+
+  public static final int DEFAULT_ZK_REFRESH = 500; // ms
+  public static final int DEFAULT_SERVER_RPC_THREADS = 10;
+  public static final int DEFAULT_SCAN_THREADS = 8;
+
+  public static Properties defaultProps() {
+    Properties props = new Properties();
+    props.putAll(ClusterFixture.TEST_CONFIGURATIONS);
+    return props;
+  }
+
+  String configResource;
+  Properties configProps;
+  boolean enableFullCache;
+  List<RuntimeOption> sessionOptions;
+  List<RuntimeOption> systemOptions;
+  int bitCount = 1;
+  String bitNames[];
+  int zkCount;
+  ZookeeperHelper zkHelper;
+
+  /**
+   * Use the given configuration properties to start the embedded Drillbit.
+   * @param configProps a collection of config properties
+   * @return this builder
+   * @see {@link #configProperty(String, Object)}
+   */
+
+  public FixtureBuilder configProps(Properties configProps) {
+    this.configProps = configProps;
+    return this;
+  }
+
+  /**
+   * Use the given configuration file, stored as a resource, to start the
+   * embedded Drillbit. Note that the resource file should have the two
+   * following settings to work as a test:
+   * <pre><code>
+   * drill.exec.sys.store.provider.local.write : false,
+   * drill.exec.http.enabled : false
+   * </code></pre>
+   * It may be more convenient to add your settings to the default
+   * config settings with {@link #configProperty(String, Object)}.
+   * @param configResource path to the file that contains the
+   * config file to be read
+   * @return this builder
+   * @see {@link #configProperty(String, Object)}
+   */
+
+  public FixtureBuilder configResource(String configResource) {
+
+    // TypeSafe gets unhappy about a leading slash, but other functions
+    // require it. Silently discard the leading slash if given to
+    // preserve the test writer's sanity.
+
+    this.configResource = ClusterFixture.trimSlash(configResource);
+    return this;
+  }
+
+  /**
+   * Add an additional boot-time property for the embedded Drillbit.
+   * @param key config property name
+   * @param value property value
+   * @return this builder
+   */
+
+  public FixtureBuilder configProperty(String key, Object value) {
+    if (configProps == null) {
+      configProps = defaultProps();
+    }
+    configProps.put(key, value.toString());
+    return this;
+  }
+
+   /**
+   * Provide a session option to be set once the Drillbit
+   * is started.
+   *
+   * @param key the name of the session option
+   * @param value the value of the session option
+   * @return this builder
+   * @see {@link ClusterFixture#alterSession(String, Object)}
+   */
+
+  public FixtureBuilder sessionOption(String key, Object value) {
+    if (sessionOptions == null) {
+      sessionOptions = new ArrayList<>();
+    }
+    sessionOptions.add(new RuntimeOption(key, value));
+    return this;
+  }
+
+  /**
+   * Provide a system option to be set once the Drillbit
+   * is started.
+   *
+   * @param key the name of the system option
+   * @param value the value of the system option
+   * @return this builder
+   * @see {@link ClusterFixture#alterSystem(String, Object)}
+   */
+
+  public FixtureBuilder systemOption(String key, Object value) {
+    if (systemOptions == null) {
+      systemOptions = new ArrayList<>();
+    }
+    systemOptions.add(new RuntimeOption(key, value));
+    return this;
+  }
+
+  /**
+   * Set the maximum parallelization (max width per node). Defaults
+   * to 2.
+   *
+   * @param n the "max width per node" parallelization option.
+   * @return this builder
+   */
+  public FixtureBuilder maxParallelization(int n) {
+    return sessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, n);
+  }
+
+  public FixtureBuilder enableFullCache() {
+    enableFullCache = true;
+    return this;
+  }
+
+  /**
+   * The number of Drillbits to start in the cluster.
+   *
+   * @param n the desired cluster size
+   * @return this builder
+   */
+  public FixtureBuilder clusterSize(int n) {
+    bitCount = n;
+    bitNames = null;
+    return this;
+  }
+
+  /**
+   * Define a cluster by providing names to the Drillbits.
+   * The cluster size is the same as the number of names provided.
+   *
+   * @param bitNames array of (unique) Drillbit names
+   * @return this builder
+   */
+  public FixtureBuilder withBits(String bitNames[]) {
+    this.bitNames = bitNames;
+    bitCount = bitNames.length;
+    return this;
+  }
+
+  /**
+   * By default the embedded Drillbits use an in-memory cluster coordinator.
+   * Use this option to start an in-memory ZK instance to coordinate the
+   * Drillbits.
+   * @return this builder
+   */
+  public FixtureBuilder withZk() {
+    return withZk(1);
+  }
+
+  public FixtureBuilder withZk(int count) {
+    zkCount = count;
+
+    // Using ZK. Turn refresh wait back on.
+
+    configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
+    return this;
+  }
+
+  /**
+   * Run the cluster using a Zookeeper started externally. Use this if
+   * multiple tests start a cluster: allows ZK to be started once for
+   * the entire suite rather than once per test case.
+   *
+   * @param zk the global Zookeeper to use
+   * @return this builder
+   */
+  public FixtureBuilder withZk(ZookeeperHelper zk) {
+    zkHelper = zk;
+
+    // Using ZK. Turn refresh wait back on.
+
+    configProperty(ExecConstants.ZK_REFRESH, DEFAULT_ZK_REFRESH);
+    return this;
+  }
+
+  /**
+   * Create the embedded Drillbit and client, applying the options set
+   * in the builder. Best to use this in a try-with-resources block:
+   * <pre><code>
+   * FixtureBuilder builder = ClientFixture.newBuilder()
+   *   .property(...)
+   *   .sessionOption(...)
+   *   ;
+   * try (ClusterFixture cluster = builder.build();
+   *      ClientFixture client = cluster.clientFixture()) {
+   *   // Do the test
+   * }
+   * </code></pre>
+   * Note that you use a single cluster fixture to create any number of
+   * drillbits in your cluster. If you want multiple clients, create the
+   * first as above, the others (or even the first) using the
+   * {@link ClusterFixture#clientBuilder()}. Using the client builder
+   * also lets you set client-side options in the rare cases that you
+   * need them.
+   *
+   * @return
+   * @throws Exception
+   */
+  public ClusterFixture build() throws Exception {
+    return new ClusterFixture(this);
+  }
+}
\ No newline at end of file