You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/23 08:09:16 UTC

[2/3] git commit: Several changes: - added base system test capacle of bootin a drillbit cluster - added a client sys test that submits a plan - corrected a bug in ZKClusterCoordinator where endpoints would not get fetched before a cacheChanged event - c

Several changes:
- added base system test capacle of bootin a drillbit cluster
- added a client sys test that submits a plan
- corrected a bug in ZKClusterCoordinator where endpoints would not get fetched before a cacheChanged event
- corrected a bug in HazelCache where it wouldn't support fetching an existing HazelCastInstance
- added a mini zookeeper cluster based on HBase's one but without the hadoop dependency
- made sure all tests pass


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/31fb6eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/31fb6eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/31fb6eb0

Branch: refs/heads/execwork
Commit: 31fb6eb0124ac8801e27b32bf54ff7a34ce7d85a
Parents: 4f3a1c6
Author: David Ribeiro Alves <da...@gmail.com>
Authored: Thu Apr 18 22:03:39 2013 -0500
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Thu Apr 18 22:03:39 2013 -0500

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java    |   12 +-
 .../apache/drill/common/config/NestedConfig.java   |   16 +-
 .../org/apache/drill/exec/cache/HazelCache.java    |   11 +-
 .../org/apache/drill/exec/client/DrillClient.java  |  100 ++++
 .../drill/exec/coord/ClusterCoordinator.java       |   14 +-
 .../drill/exec/coord/ZKClusterCoordinator.java     |  137 +++---
 .../org/apache/drill/exec/server/Drillbit.java     |   20 +-
 .../apache/drill/exec/server/DrillbitContext.java  |    3 +-
 .../java-exec/src/main/resources/drill-module.conf |    1 +
 .../org/apache/drill/exec/DrillSystemTestBase.java |   77 +++-
 .../drill/exec/client/DrillClientSystemTest.java   |   42 ++-
 .../drill/exec/util/MiniZooKeeperCluster.java      |  368 +++++++++++++++
 .../java-exec/src/test/resources/drill-module.conf |   28 ++
 .../java-exec/src/test/resources/simple_plan.json  |  133 ++++++
 14 files changed, 861 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index b738002..dc9327e 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.media.jfxmedia.events.VideoTrackSizeListener;
 import org.apache.drill.common.exceptions.DrillConfigurationException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.StorageEngineConfigBase;
@@ -44,8 +46,9 @@ public final class DrillConfig extends NestedConfig{
   
   @SuppressWarnings("unchecked")
   private volatile List<Queue<Object>> sinkQueues = new CopyOnWriteArrayList<Queue<Object>>(new Queue[1]);
-  
-  private DrillConfig(Config config) {
+
+  @VisibleForTesting
+  public DrillConfig(Config config) {
     super(config);
     mapper = new ObjectMapper();
     SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule").addDeserializer(LogicalExpression.class, new LogicalExpression.De(this));
@@ -145,4 +148,9 @@ public final class DrillConfig extends NestedConfig{
   public ObjectMapper getMapper(){
     return mapper;
   }
+
+  @Override
+  public String toString(){
+    return this.root().render();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
index b8d6133..6954707 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/NestedConfig.java
@@ -17,18 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.common.config;
 
+import com.typesafe.config.*;
+
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigList;
-import com.typesafe.config.ConfigMergeable;
-import com.typesafe.config.ConfigObject;
-import com.typesafe.config.ConfigOrigin;
-import com.typesafe.config.ConfigResolveOptions;
-import com.typesafe.config.ConfigValue;
-
 abstract class NestedConfig implements Config {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedConfig.class);
 
@@ -187,15 +181,15 @@ abstract class NestedConfig implements Config {
   }
 
   public Config atPath(String path) {
-    return null;
+    return c.atPath(path);
   }
 
   public Config atKey(String key) {
-    return null;
+    return c.atKey(key);
   }
 
   public Config withValue(String path, ConfigValue value) {
-    return null;
+    return c.withValue(path, value);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index cc73799..b7477a3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -65,9 +65,8 @@ public class HazelCache implements DistributedCache {
   
   public void run(DrillbitEndpoint endpoint) {
     Config c = new Config();
-    // todo, utilize cluster member ship to set up other nodes.
     c.setInstanceName(instanceName);
-    instance = Hazelcast.newHazelcastInstance(c);
+    instance = getInstanceOrCreateNew(c);
     workQueueLengths = instance.getTopic("queue-length");
     optimizedPlans = instance.getMap("plan-optimizations");
     this.endpoint = endpoint;
@@ -75,6 +74,14 @@ public class HazelCache implements DistributedCache {
     workQueueLengths.addMessageListener(new Listener());
   }
 
+  private HazelcastInstance getInstanceOrCreateNew(Config c) {
+    for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
+      if (instance.getName().equals(this.instanceName))
+        return instance;
+    }
+    return Hazelcast.newHazelcastInstance(c);
+  }
+
   @Override
   public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical) {
     optimizedPlans.put(logical, physical);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
new file mode 100644
index 0000000..e0f3347
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -0,0 +1,100 @@
+package org.apache.drill.exec.client;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.UserClient;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.get;
+import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
+import static io.netty.buffer.Unpooled.copiedBuffer;
+import static java.nio.charset.Charset.forName;
+import static org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import static org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
+import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
+
+/**
+ * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
+ */
+public class DrillClient {
+
+  DrillConfig config;
+  private UserClient client;
+  private ClusterCoordinator clusterCoordinator;
+
+  public DrillClient() {
+    this(DrillConfig.create());
+  }
+
+  public DrillClient(String fileName) {
+    this(DrillConfig.create(fileName));
+  }
+
+  public DrillClient(DrillConfig config) {
+    this.config = config;
+  }
+
+  /**
+   * Connects the client to a Drillbit server
+   *
+   * @throws IOException
+   */
+  public void connect() throws Exception {
+    this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+    this.clusterCoordinator.start();
+    Thread.sleep(10000);
+    Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints();
+    checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
+    // just use the first endpoint for now
+    DrillbitEndpoint endpoint = get(endpoints, 0);
+    ByteBufAllocator bb = new PooledByteBufAllocator(true);
+    this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+    try {
+      this.client.connectAsClient(endpoint.getAddress(), endpoint.getUserPort());
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Closes this client's connection to the server
+   *
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    this.client.close();
+  }
+
+  /**
+   * Submits a Logical plan for direct execution (bypasses parsing)
+   *
+   * @param plan the plan to execute
+   * @return a handle for the query result
+   * @throws RpcException
+   */
+  public DrillRpcFuture<QueryHandle> submitPlan(String plan) throws RpcException {
+    return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).setPlan(plan).build(), EMPTY_BUFFER);
+  }
+
+  /**
+   * Submits a Query for parsing and execution
+   *
+   * @param query the query to execute
+   * @return a handle for the query result
+   * @throws RpcException
+   */
+  public DrillRpcFuture<QueryHandle> submitQuery(String query) throws RpcException {
+    return this.client.submitQuery(newBuilder().setMode(STREAM_FULL).build(), copiedBuffer(query, forName("UTF-8")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index 9c7eab2..d7ea8fa 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -17,11 +17,11 @@
  ******************************************************************************/
 package org.apache.drill.exec.coord;
 
-import java.io.Closeable;
-import java.util.List;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
+import java.io.Closeable;
+import java.util.Collection;
+
 /**
  * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
  * as well as understand other node's existence and capabilities.
@@ -36,10 +36,12 @@ public abstract class ClusterCoordinator implements Closeable {
   public abstract void unregister(RegistrationHandle handle);
 
   /**
-   * Get a list of avialable Drillbit endpoints.  Thread-safe.  Could be slightly out of date depending on refresh policy.
-   * @return A list of available endpoints.
+   * Get a collection of avialable Drillbit endpoints, Thread-safe.
+   * Could be slightly out of date depending on refresh policy.
+   *
+   * @return A collection of available endpoints.
    */
-  public abstract List<DrillbitEndpoint> getAvailableEndpoints();
+  public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();
 
   public interface RegistrationHandle {
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
index b3cd27f..3ad08e1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,17 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.coord;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Throwables;
+import com.google.common.base.Function;
 import com.netflix.curator.RetryPolicy;
 import com.netflix.curator.framework.CuratorFramework;
 import com.netflix.curator.framework.CuratorFrameworkFactory;
@@ -38,8 +28,20 @@ import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
 import com.netflix.curator.x.discovery.ServiceInstance;
 import com.netflix.curator.x.discovery.details.ServiceCache;
 import com.netflix.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 
-/** Manages cluster coordination utilizing zookeeper. **/
+import static com.google.common.base.Throwables.propagate;
+import static com.google.common.collect.Collections2.transform;
+
+/**
+ * Manages cluster coordination utilizing zookeeper. *
+ */
 public class ZKClusterCoordinator extends ClusterCoordinator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKClusterCoordinator.class);
 
@@ -47,24 +49,26 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   private CuratorFramework curator;
   private ServiceDiscovery<DrillbitEndpoint> discovery;
   private ServiceCache<DrillbitEndpoint> serviceCache;
-  private volatile List<DrillbitEndpoint> endpoints = Collections.emptyList();
+  private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
   private final String serviceName;
+
   public ZKClusterCoordinator(DrillConfig config) throws IOException {
-    
+
     this.basePath = config.getString(ExecConstants.ZK_ROOT);
-    this.serviceName =  config.getString(ExecConstants.SERVICE_NAME);
-    
+    this.serviceName = config.getString(ExecConstants.SERVICE_NAME);
     RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
-        config.getInt(ExecConstants.ZK_RETRY_DELAY));
-    
+      config.getInt(ExecConstants.ZK_RETRY_DELAY));
     curator = CuratorFrameworkFactory.builder()
-        .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
-        .retryPolicy(rp)
-        .connectString(config.getString(ExecConstants.ZK_CONNECTION))
-        .build(); 
-    
+      .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+      .retryPolicy(rp)
+      .connectString(config.getString(ExecConstants.ZK_CONNECTION))
+      .build();
     discovery = getDiscovery();
-    serviceCache = discovery.serviceCacheBuilder().name(serviceName).refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH)).build();
+    serviceCache = discovery.
+      serviceCacheBuilder()
+      .name(serviceName)
+      .refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH))
+      .build();
   }
 
   public void start() throws Exception {
@@ -73,10 +77,11 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     discovery.start();
     serviceCache.start();
     serviceCache.addListener(new ZKListener());
+    updateEndpoints();
   }
-  
-  private class ZKListener implements ServiceCacheListener{
-    
+
+  private class ZKListener implements ServiceCacheListener {
+
     @Override
     public void stateChanged(CuratorFramework client, ConnectionState newState) {
     }
@@ -84,62 +89,78 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     @Override
     public void cacheChanged() {
       logger.debug("Cache changed, updating.");
-      try {
-        Collection<ServiceInstance<DrillbitEndpoint>> instances = discovery.queryForInstances(serviceName);
-        List<DrillbitEndpoint> newEndpoints = new ArrayList<DrillbitEndpoint>(instances.size());
-        for(ServiceInstance<DrillbitEndpoint> si : instances){
-          newEndpoints.add(si.getPayload());
-        }
-        endpoints = newEndpoints;
-      } catch (Exception e) {
-        logger.error("Failure while update Drillbit service location cache.", e);
-      }
+      updateEndpoints();
     }
   }
 
-  public void close() throws IOException{
+  public void close() throws IOException {
     serviceCache.close();
     discovery.close();
     curator.close();
   }
-  
+
   @Override
   public RegistrationHandle register(DrillbitEndpoint data) {
     try {
-      ServiceInstance<DrillbitEndpoint> si = getSI(data);
-      discovery.registerService(si);
-      return new ZKRegistrationHandle(si.getId());
+      ServiceInstance<DrillbitEndpoint> serviceInstance = getServiceInstance(data);
+      discovery.registerService(serviceInstance);
+      return new ZKRegistrationHandle(serviceInstance.getId());
     } catch (Exception e) {
-      Throwables.propagate(e);
-      return null;
+      throw propagate(e);
     }
   }
 
   @Override
   public void unregister(RegistrationHandle handle) {
-    if( !( handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type");
-    
+    if (!(handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type");
+
     ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
     try {
-      ServiceInstance<DrillbitEndpoint> si = ServiceInstance.<DrillbitEndpoint>builder().address("").port(0).id(h.id).name(ExecConstants.SERVICE_NAME).build();
-      discovery.unregisterService(si);
+      ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder()
+        .address("")
+        .port(0)
+        .id(h.id)
+        .name(serviceName)
+        .build();
+      discovery.unregisterService(serviceInstance);
     } catch (Exception e) {
-      Throwables.propagate(e);
+      propagate(e);
     }
   }
 
   @Override
-  public List<DrillbitEndpoint> getAvailableEndpoints() {
+  public Collection<DrillbitEndpoint> getAvailableEndpoints() {
     return this.endpoints;
   }
-  
-  private ServiceInstance<DrillbitEndpoint> getSI(DrillbitEndpoint ep) throws Exception{
-    return ServiceInstance.<DrillbitEndpoint>builder().name(ExecConstants.SERVICE_NAME).payload(ep).build();
+
+  private void updateEndpoints() {
+    try {
+      endpoints = transform(discovery.queryForInstances(serviceName),
+        new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
+          @Override
+          public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {
+            return input.getPayload();
+          }
+        });
+    } catch (Exception e) {
+      logger.error("Failure while update Drillbit service location cache.", e);
+    }
   }
-  
-  
+
+  private ServiceInstance<DrillbitEndpoint> getServiceInstance(DrillbitEndpoint endpoint) throws Exception {
+    return ServiceInstance.<DrillbitEndpoint>builder()
+      .name(serviceName)
+      .payload(endpoint)
+      .build();
+  }
+
 
   public ServiceDiscovery<DrillbitEndpoint> getDiscovery() {
-    return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath(basePath).client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
+    return ServiceDiscoveryBuilder
+      .builder(DrillbitEndpoint.class)
+      .basePath(basePath)
+      .client(curator)
+      .serializer(DrillServiceInstanceHelper.SERIALIZER)
+      .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 99ebe85..2961fae 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -32,20 +32,24 @@ import org.apache.drill.exec.service.ServiceEngine;
 
 import java.net.InetAddress;
 
+/**
+ * Starts, tracks and stops all the required services for a Drillbit daemon to work.
+ */
 public class Drillbit {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
 
   public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
-    Drillbit bit = null;
+    return start(DrillConfig.create(options.getConfigLocation()));
+  }
+
+  public static Drillbit start(DrillConfig config) throws DrillbitStartupException {
+    Drillbit bit;
     try {
       logger.debug("Setting up Drillbit.");
-      DrillConfig config = DrillConfig.create(options.getConfigLocation());
       bit = new Drillbit(config);
     } catch (Exception ex) {
       throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
     }
-
-
     try {
       logger.debug("Starting Drillbit.");
       bit.run();
@@ -80,9 +84,11 @@ public class Drillbit {
   public void run() throws Exception {
     coord.start();
     engine.start();
-    
-    DrillbitEndpoint md = DrillbitEndpoint.newBuilder().setAddress(InetAddress.getLocalHost().getHostAddress())
-        .setBitPort(engine.getBitPort()).setUserPort(engine.getUserPort()).build();
+    DrillbitEndpoint md = DrillbitEndpoint.newBuilder()
+      .setAddress(InetAddress.getLocalHost().getHostAddress())
+      .setBitPort(engine.getBitPort())
+      .setUserPort(engine.getUserPort())
+      .build();
     handle = coord.register(md);
     cache.run(md);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 94c8207..e3a24d2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.server;
 
 import io.netty.channel.nio.NioEventLoopGroup;
 
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -45,7 +46,7 @@ public class DrillbitContext {
     return config;
   }
   
-  public List<DrillbitEndpoint> getBits(){
+  public Collection<DrillbitEndpoint> getBits(){
     return underlyingBit.coord.getAvailableEndpoints();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index ad18d6e..c516dda 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -4,6 +4,7 @@
 drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
+    user.address : localhost
   	user.port : 31010,
   	bit.port : 31011
   },

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
index 7f5264c..645c4d5 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
@@ -18,27 +18,66 @@
 package org.apache.drill.exec;
 
 import com.google.common.collect.ImmutableList;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.StartupOptions;
+import org.apache.drill.exec.util.MiniZooKeeperCluster;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
 import static com.google.common.base.Throwables.propagate;
 
 /**
  * Base class for Drill system tests.
- * Starts one or more Drillbits and provides a configured client for testing.
+ * Starts one or more Drillbits, an embedded ZooKeeper cluster and provides a configured client for testing.
  */
 public class DrillSystemTestBase {
 
-  private static List<Drillbit> servers;
+  static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
 
-  public void startCluster(StartupOptions options, int numServers) {
+  private static File testDir = new File("target/test-data");
+  private static DrillConfig config;
+  private static String zkUrl;
+  private static int bitPort;
+  private static int userPort;
+
+  private List<Drillbit> servers;
+  private MiniZooKeeperCluster zkCluster;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    config = DrillConfig.create();
+    bitPort = config.getInt(ExecConstants.INITIAL_BIT_PORT);
+    userPort = config.getInt(ExecConstants.INITIAL_USER_PORT);
+    zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
+    setupTestDir();
+  }
+
+  private static void setupTestDir() {
+    if (!testDir.exists()) {
+      testDir.mkdirs();
+    }
+  }
+
+  private DrillConfig newConfigWithDifferentPorts() {
+    return new DrillConfig(config
+      .withValue(ExecConstants.INITIAL_BIT_PORT, ConfigValueFactory.fromAnyRef(bitPort++))
+      .withValue(ExecConstants.INITIAL_USER_PORT, ConfigValueFactory.fromAnyRef(userPort++)));
+  }
+
+  public void startCluster(int numServers) {
     try {
       ImmutableList.Builder<Drillbit> servers = ImmutableList.builder();
       for (int i = 0; i < numServers; i++) {
-        servers.add(Drillbit.start(options));
+        DrillConfig config = newConfigWithDifferentPorts();
+//        System.out.println("NEW CONFIG");
+//        System.out.println(config);
+        servers.add(Drillbit.start(config));
       }
       this.servers = servers.build();
     } catch (DrillbitStartupException e) {
@@ -46,16 +85,36 @@ public class DrillSystemTestBase {
     }
   }
 
-  public void startZookeeper() {
-
+  public void startZookeeper(int numServers) {
+    try {
+      this.zkCluster = new MiniZooKeeperCluster();
+      zkCluster.setDefaultClientPort(Integer.parseInt(this.zkUrl.split(":")[1]));
+      zkCluster.startup(testDir, numServers);
+    } catch (IOException e) {
+      propagate(e);
+    } catch (InterruptedException e) {
+      propagate(e);
+    }
   }
 
   public void stopCluster() {
-
+    if (servers != null) {
+      for (Drillbit server : servers) {
+        try {
+          server.close();
+        } catch (Exception e) {
+          logger.warn("Error shutting down Drillbit", e);
+        }
+      }
+    }
   }
 
   public void stopZookeeper() {
-
+    try {
+      this.zkCluster.shutdown();
+    } catch (IOException e) {
+      propagate(e);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
index a8ac41e..09a06d7 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
@@ -1,7 +1,12 @@
 package org.apache.drill.exec.client;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
 import org.apache.drill.exec.DrillSystemTestBase;
-import org.apache.drill.exec.server.StartupOptions;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.junit.After;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -9,12 +14,39 @@ import org.junit.Test;
  */
 public class DrillClientSystemTest extends DrillSystemTestBase {
 
-  StartupOptions options = new StartupOptions();
+  private static String plan;
 
-  @Test
-  public void testSubmitQuery() {
-    startCluster(options, 1);
+  @BeforeClass
+  public static void setUp() throws Exception {
+    DrillSystemTestBase.setUp();
+    plan = Resources.toString(Resources.getResource("simple_plan.json"), Charsets.UTF_8);
+  }
 
+  @After
+  public void tearDown() {
+    stopCluster();
+    stopZookeeper();
+  }
 
+  @Test
+  public void testSubmitPlanSingleNode() throws Exception {
+    startZookeeper(1);
+    startCluster(1);
+    DrillClient client = new DrillClient();
+    client.connect();
+    DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
+    System.out.println(result.get());
+    client.close();
+  }
+
+  @Test
+  public void testSubmitPlanTwoNodes() throws Exception {
+    startZookeeper(1);
+    startCluster(2);
+    DrillClient client = new DrillClient();
+    client.connect();
+    DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
+    System.out.println(result.get());
+    client.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..05011ac
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/util/MiniZooKeeperCluster.java
@@ -0,0 +1,368 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+import java.io.*;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Mostly Copied from HBase's MiniZooKeeperCluster, but without the Hadoop dependency.
+ */
+public class MiniZooKeeperCluster {
+  private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
+
+  private static final int TICK_TIME = 2000;
+  private static final int CONNECTION_TIMEOUT = 30000;
+
+  private boolean started;
+
+  /**
+   * The default port. If zero, we use a random port.
+   */
+  private int defaultClientPort = 0;
+
+  private int clientPort;
+
+  private List<NIOServerCnxnFactory> standaloneServerFactoryList;
+  private List<ZooKeeperServer> zooKeeperServers;
+  private List<Integer> clientPortList;
+
+  private int activeZKServerIndex;
+  private int tickTime = 0;
+
+  public MiniZooKeeperCluster() {
+    this.started = false;
+//    this.configuration = configuration;
+    activeZKServerIndex = -1;
+    zooKeeperServers = new ArrayList<ZooKeeperServer>();
+    clientPortList = new ArrayList<Integer>();
+    standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
+  }
+
+  public void setDefaultClientPort(int clientPort) {
+    if (clientPort <= 0) {
+      throw new IllegalArgumentException("Invalid default ZK client port: "
+        + clientPort);
+    }
+    this.defaultClientPort = clientPort;
+  }
+
+  /**
+   * Selects a ZK client port. Returns the default port if specified.
+   * Otherwise, returns a random port. The random port is selected from the
+   * range between 49152 to 65535. These ports cannot be registered with IANA
+   * and are intended for dynamic allocation (see http://bit.ly/dynports).
+   */
+  private int selectClientPort() {
+    if (defaultClientPort > 0) {
+      return defaultClientPort;
+    }
+    return 0xc000 + new Random().nextInt(0x3f00);
+  }
+
+  public void setTickTime(int tickTime) {
+    this.tickTime = tickTime;
+  }
+
+  public int getBackupZooKeeperServerNum() {
+    return zooKeeperServers.size() - 1;
+  }
+
+  public int getZooKeeperServerNum() {
+    return zooKeeperServers.size();
+  }
+
+  // / XXX: From o.a.zk.t.ClientBase
+  private static void setupTestEnv() {
+    // during the tests we run with 100K prealloc in the logs.
+    // on windows systems prealloc of 64M was seen to take ~15seconds
+    // resulting in test failure (client timeout on first session).
+    // set env and directly in order to handle static init/gc issues
+    System.setProperty("zookeeper.preAllocSize", "100");
+    FileTxnLog.setPreallocSize(100 * 1024);
+  }
+
+  public int startup(File baseDir) throws IOException, InterruptedException {
+    return startup(baseDir, 1);
+  }
+
+  /**
+   * @param baseDir
+   * @param numZooKeeperServers
+   * @return ClientPort server bound to.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int startup(File baseDir, int numZooKeeperServers) throws IOException,
+    InterruptedException {
+    if (numZooKeeperServers <= 0)
+      return -1;
+
+    setupTestEnv();
+    shutdown();
+
+    int tentativePort = selectClientPort();
+
+    // running all the ZK servers
+    for (int i = 0; i < numZooKeeperServers; i++) {
+      File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+      recreateDir(dir);
+      int tickTimeToUse;
+      if (this.tickTime > 0) {
+        tickTimeToUse = this.tickTime;
+      } else {
+        tickTimeToUse = TICK_TIME;
+      }
+      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+      NIOServerCnxnFactory standaloneServerFactory;
+      while (true) {
+        try {
+          standaloneServerFactory = new NIOServerCnxnFactory();
+          standaloneServerFactory.configure(
+            new InetSocketAddress(tentativePort), 1000);
+        } catch (BindException e) {
+          LOG.debug("Failed binding ZK Server to client port: " +
+            tentativePort);
+          // This port is already in use, try to use another.
+          tentativePort++;
+          continue;
+        }
+        break;
+      }
+
+      // Start up this ZK server
+      standaloneServerFactory.startup(server);
+      if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for startup of standalone server");
+      }
+
+      // We have selected this port as a client port.
+      clientPortList.add(tentativePort);
+      standaloneServerFactoryList.add(standaloneServerFactory);
+      zooKeeperServers.add(server);
+    }
+
+    // set the first one to be active ZK; Others are backups
+    activeZKServerIndex = 0;
+    started = true;
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+      "on client port: " + clientPort);
+    return clientPort;
+  }
+
+  private void recreateDir(File dir) throws IOException {
+    if (dir.exists()) {
+      FileUtil.fullyDelete(dir);
+    }
+    try {
+      dir.mkdirs();
+    } catch (SecurityException e) {
+      throw new IOException("creating dir: " + dir, e);
+    }
+  }
+
+  /**
+   * @throws IOException
+   */
+  public void shutdown() throws IOException {
+    if (!started) {
+      return;
+    }
+    // shut down all the zk servers
+    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+      NIOServerCnxnFactory standaloneServerFactory =
+        standaloneServerFactoryList.get(i);
+      int clientPort = clientPortList.get(i);
+
+      standaloneServerFactory.shutdown();
+      if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for shutdown of standalone server");
+      }
+    }
+
+    // clear everything
+    started = false;
+    activeZKServerIndex = 0;
+    standaloneServerFactoryList.clear();
+    clientPortList.clear();
+    zooKeeperServers.clear();
+
+    LOG.info("Shutdown MiniZK cluster with all ZK servers");
+  }
+
+  /**
+   * @return clientPort return clientPort if there is another ZK backup can run
+   *         when killing the current active; return -1, if there is no backups.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int killCurrentActiveZooKeeperServer() throws IOException,
+    InterruptedException {
+    if (!started || activeZKServerIndex < 0) {
+      return -1;
+    }
+
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+      standaloneServerFactoryList.get(activeZKServerIndex);
+    int clientPort = clientPortList.get(activeZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    // remove the current active zk server
+    standaloneServerFactoryList.remove(activeZKServerIndex);
+    clientPortList.remove(activeZKServerIndex);
+    zooKeeperServers.remove(activeZKServerIndex);
+    LOG.info("Kill the current active ZK servers in the cluster " +
+      "on client port: " + clientPort);
+
+    if (standaloneServerFactoryList.size() == 0) {
+      // there is no backup servers;
+      return -1;
+    }
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Activate a backup zk server in the cluster " +
+      "on client port: " + clientPort);
+    // return the next back zk server's port
+    return clientPort;
+  }
+
+  /**
+   * Kill one back up ZK servers
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void killOneBackupZooKeeperServer() throws IOException,
+    InterruptedException {
+    if (!started || activeZKServerIndex < 0 ||
+      standaloneServerFactoryList.size() <= 1) {
+      return;
+    }
+
+    int backupZKServerIndex = activeZKServerIndex + 1;
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+      standaloneServerFactoryList.get(backupZKServerIndex);
+    int clientPort = clientPortList.get(backupZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    // remove this backup zk server
+    standaloneServerFactoryList.remove(backupZKServerIndex);
+    clientPortList.remove(backupZKServerIndex);
+    zooKeeperServers.remove(backupZKServerIndex);
+    LOG.info("Kill one backup ZK servers in the cluster " +
+      "on client port: " + clientPort);
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerDown(int port, long timeout) {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+        } finally {
+          sock.close();
+        }
+      } catch (IOException e) {
+        return true;
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    return false;
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerUp(int port, long timeout) {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = new Socket("localhost", port);
+        BufferedReader reader = null;
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+
+          Reader isr = new InputStreamReader(sock.getInputStream());
+          reader = new BufferedReader(isr);
+          String line = reader.readLine();
+          if (line != null && line.startsWith("Zookeeper version:")) {
+            return true;
+          }
+        } finally {
+          sock.close();
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      } catch (IOException e) {
+        // ignore as this is expected
+        LOG.info("server localhost:" + port + " not up " + e);
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      try {
+        Thread.sleep(250);
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+    return false;
+  }
+
+  public int getClientPort() {
+    return clientPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
new file mode 100644
index 0000000..7c97f66
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
@@ -0,0 +1,28 @@
+//  This file tells Drill to consider this module when class path scanning.  
+//  This file can also include any supplementary configuration information.  
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.exec: {
+  cluster-id: "drillbits1"
+  rpc: {
+  	user.port : 31010,
+  	bit.port : 32010
+  },
+  optimizer: {
+    implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+  },
+  
+  zk: {
+	connect: "localhost:2181",
+	root: "/drill",
+	refresh: 500,
+	timeout: 1000,
+	retry: {
+	  count: 7200,
+	  delay: 500
+	}    
+  }
+
+  network: {
+    start: 35000
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/31fb6eb0/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json b/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json
new file mode 100644
index 0000000..2457b1f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/simple_plan.json
@@ -0,0 +1,133 @@
+{
+  head:{
+    type:"apache_drill_logical_plan",
+    version:"1",
+    generator:{
+      type:"manual",
+      info:"na"
+    }
+  },
+  storage:{
+    logs: {
+      type:"text",
+	  file: "local://logs/*.log",
+	  compress:"gzip",
+	  line-delimiter:"\n",
+	  record-maker:{
+	    type:"first-row",
+	    delimiter:","
+	  }
+    },
+    {
+      type:"mongo",
+      name:"users",
+      connection:"mongodb://blue:red@localhost/users"
+    },
+    {
+      type:"mysql",
+      name:"mysql",
+      connection:"jdbc:mysql://localhost/main"
+    }
+  ],
+  query:[
+    {
+      @id:"1",
+      op:"scan",
+      memo:"initial_scan",
+      storageengine:"local-logs",
+      selection: {}
+    },
+    {
+      @id:"2",
+      input:"1",
+      memo:"transform1",
+      op:"transform",
+      transforms:[
+        {
+          ref:"userId",
+          expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+        },
+        {
+          ref:"session",
+          expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+        }
+      ]
+    },
+    {
+      @id:"3",
+      input:"2",
+      memo:"transform2",
+      op:"transform",
+      transforms:[
+        {
+          ref:"userId",
+          expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+        },
+        {
+          ref:"session",
+          expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+        }
+      ]
+    },
+    {
+      @id:"7",
+      input:"3",
+      op:"sequence",
+      do:[
+        {
+          op:"transform",
+          memo:"seq_transform",
+          transforms:[
+            {
+              ref:"happy",
+              expr:"regex_like('ep2', \"dink\")"
+            }
+          ]
+        }
+        ,
+        {
+          op:"transform",
+          memo:"last_transform",
+          transforms:[
+            {
+              ref:"abc",
+              expr:"123"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      @id:"10",
+      input:"3",
+      op:"transform",
+      memo:"t3",
+      transforms:[
+        {
+          ref:"happy",
+          expr:"regex_like('ep2', \"dink\")"
+        }
+      ]
+    },
+    {
+      @id:12,
+      op:"join",
+      type: "inner",
+      left:"7",
+      right:"10",
+      conditions: [{relationship:"==", left: "1", right: "1" }]
+    }
+    ,
+    {
+      input: 12,
+      op: "store",
+      memo: "output sink",
+      target: {
+        file: "console:///stdout"
+      }
+      
+    }
+
+    
+  ]
+}
\ No newline at end of file