You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:08 UTC

[4/9] basic framework for physical plan. abstraction of graph classes.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java
new file mode 100644
index 0000000..66bbad8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(separators = "=")
+public class StartupOptions {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StartupOptions.class);
+
+  @Parameter(names={"-h", "--help"}, description="Provide description of usage.", help=true)
+  private boolean help = false;
+  
+  @Parameter(names= {"-d", "--debug"}, description="Whether you want to run the program in debug mode.", required=false)
+  private boolean debug = false;
+  
+  @Parameter(names= {"-c", "--config"}, description="Configuration file you want to load.  Defaults to loading 'drill-override.conf' from the classpath.", required=false)
+  private String configLocation = null;
+
+  @Parameter
+  private List<String> exccess = new ArrayList<String>();
+
+  public boolean isDebug() {
+    return debug;
+  }
+
+  public String getConfigLocation() {
+    return configLocation;
+  }
+
+  public List<String> getExccess() {
+    return exccess;
+  }
+
+  public static StartupOptions parse(String[] cliArgs) {
+    logger.debug("Parsing arguments.");
+    StartupOptions args = new StartupOptions();
+    JCommander jc = new JCommander(args, cliArgs);
+    if(args.help){
+      jc.usage();
+      System.exit(0);
+    }
+    return args;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
new file mode 100644
index 0000000..97db72e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.service;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.rpc.bit.BitComImpl;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.io.Closeables;
+
+public class ServiceEngine implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class);
+  
+  UserServer userServer;
+  BitComImpl bitCom;
+  int userPort;
+  int bitPort;
+  DrillbitContext context;
+  
+  public ServiceEngine(DrillbitContext context){
+    ByteBufAllocator allocator = context.getAllocator().getUnderlyingAllocator();
+    userServer = new UserServer(allocator, new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), context);
+    bitCom = new BitComImpl(context);
+  }
+  
+  public void start() throws DrillbitStartupException, InterruptedException{
+    userPort = userServer.bind(context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT));
+    bitPort = bitCom.start();
+  }
+  
+  public int getBitPort(){
+    return bitPort;
+  }
+  
+  public int getUserPort(){
+    return userPort;
+  }
+
+  public BitCom getBitCom(){
+    return bitCom;
+  }
+  
+  @Override
+  public void close() throws IOException {
+    Closeables.closeQuietly(userServer);
+    Closeables.closeQuietly(bitCom);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java
new file mode 100644
index 0000000..0536206
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java
@@ -0,0 +1,21 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+public interface QueryOptimizerRule {
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
new file mode 100644
index 0000000..9fc4165
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import org.apache.drill.exec.exception.ExecutionSetupException;
+import org.apache.drill.exec.ops.OutputMutator;
+import org.apache.drill.exec.record.BatchSchema;
+
+public interface RecordReader {
+
+  /**
+   * Configure the RecordReader with the provided schema and the record batch that should be written to.
+   * 
+   * @param knownSchema
+   *          The set of fields that should be written to as well as the expected types for those fields. In the case
+   *          that RecordReader has a known schema and the expectedSchema does not match the actual schema, a
+   *          ExceptionSetupException will be thrown.
+   * @param output
+   *          The place where output for a particular scan should be written. The record reader is responsible for
+   *          mutating the set of schema values for that particular record.
+   * @throws ExecutionSetupException
+   */
+  public abstract void setup(BatchSchema expectedSchema, OutputMutator output) throws ExecutionSetupException;
+
+  /**
+   * Increment record reader forward, writing into the provided output batch.  
+   * 
+   * @return The number of additional records added to the output.
+   */
+  public abstract int next();
+
+  public abstract void cleanup();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java
new file mode 100644
index 0000000..c5fcc42
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface RecordRecorder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRecorder.class);
+  
+  public void setup() throws IOException;
+  
+  /**
+   * 
+   * @param batch
+   * @return
+   */
+  public boolean record(RecordBatch batch);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
new file mode 100644
index 0000000..83749c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.collect.ListMultimap;
+
+public interface StorageEngine {
+  public boolean supportsRead();
+
+  public boolean supportsWrite();
+
+  public enum PartitionCapabilities {
+    NONE, HASH, RANGE;
+  }
+
+  public List<QueryOptimizerRule> getOptimizerRules();
+
+  /**
+   * Get the set of read entries required for a particular Scan (read) node. This is somewhat analogous to traditional
+   * MapReduce. The difference is, this is the most granular split paradigm.
+   * 
+   * @param scan
+   *          The configured scan entries.
+   * @return
+   * @throws IOException
+   */
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
+
+  /**
+   * Get the set of Drillbit endpoints that are available for each read entry. Note that it is possible for a read entry
+   * to have no Drillbit locations. In that case, the multimap will contain no values for that read entry.
+   * 
+   * @return Multimap of ReadEntry > List<DrillbitEndpoint> for ReadEntries with available locations.
+   */
+  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries);
+
+  /**
+   * Get a particular reader for a fragment context.
+   * @param context
+   * @param readEntry
+   * @return
+   * @throws IOException
+   */
+  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException;
+
+  /**
+   * 
+   * @param context
+   * @param writeEntry
+   * @return
+   * @throws IOException
+   */
+  public RecordRecorder getWriter(FragmentContext context, WriteEntry writeEntry) throws IOException;
+
+  
+  public interface ReadEntry {
+    public Cost getCostEstimate();
+  }
+
+  public interface WriteEntry {
+  }
+
+  public static class Cost {
+    public long disk;
+    public long network;
+    public long memory;
+    public long cpu;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
new file mode 100644
index 0000000..b2e31e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+public class StorageEngineRegistry {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
+  
+  private Map<Object, Constructor<? extends StorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends StorageEngine>>();
+  private Map<StorageEngineConfig, StorageEngine> activeEngines = new HashMap<StorageEngineConfig, StorageEngine>();
+
+  private DrillbitContext context;
+  public StorageEngineRegistry(DrillbitContext context){
+    this.context = context;
+    setup(context.getConfig());
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void setup(DrillConfig config){
+    Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage engines {}", engines);
+    for(Class<? extends StorageEngine> engine: engines){
+      int i =0;
+      for(Constructor<?> c : engine.getConstructors()){
+        Class<?>[] params = c.getParameterTypes();
+        if(params.length != 2 || params[1] == DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
+          logger.debug("Skipping ReferenceStorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
+          continue;
+        }
+        availableEngines.put(params[0], (Constructor<? extends StorageEngine>) c);
+        i++;
+      }
+      if(i == 0){
+        logger.debug("Skipping registration of ReferenceStorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+      }
+    }
+  }
+  
+  public StorageEngine getEngine(StorageEngineConfig engineConfig) throws SetupException{
+    StorageEngine engine = activeEngines.get(engineConfig);
+    if(engine != null) return engine;
+    Constructor<? extends StorageEngine> c = availableEngines.get(engineConfig.getClass());
+    if(c == null) throw new SetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
+    try {
+      return c.newInstance(engineConfig, context);
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
+      if(t instanceof SetupException) throw ((SetupException) t);
+      throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
+    }
+  }
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
new file mode 100644
index 0000000..43c408d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
@@ -0,0 +1,32 @@
+package exec;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "CoordinationProtos";
+option optimize_for = LITE_RUNTIME;
+
+message DrillbitEndpoint{
+  optional string address = 1;
+  optional int32 user_port = 2;
+  optional int32 bit_port = 3;
+  optional Roles roles = 4;
+}
+
+message DrillServiceInstance{
+  optional string id = 1;
+  optional int64 registrationTimeUTC = 2;
+  optional DrillbitEndpoint endpoint = 3;
+}
+
+message WorkQueueStatus{
+	optional DrillbitEndpoint endpoint = 1;
+	optional int32 queue_length = 2;
+	optional int64 report_time = 3;
+}
+
+message Roles{
+	optional bool sql_query = 1 [default = true];
+	optional bool logical_plan = 2 [default = true];
+	optional bool physical_plan = 3 [default = true];
+	optional bool java_executor = 4 [default = true];
+	optional bool distributed_cache = 5 [default = true];
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
new file mode 100644
index 0000000..cd8bda2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
@@ -0,0 +1,65 @@
+package exec.bit;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "ExecProtos";
+option optimize_for = LITE_RUNTIME;
+import "SchemaDef.proto";
+import "Coordination.proto";
+
+
+////// UserToBit RPC ///////
+enum RpcType {
+    HANDSHAKE = 0;
+    ACK = 1;
+    GOODBYE = 2;
+    
+    // bit requests
+    REQ_INIATILIZE_FRAGMENT = 3; // Returns Handle
+    REQ_RECORD_BATCH = 4; // send record batch overview, returns Ack
+    REQ_BATCH_CHUNK = 5; // send additional batch chunk, returns Ack.
+    REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
+	REQ_FRAGMENT_STATUS = 7; // get a fragment status, returns FragmentStatus
+	REQ_BIT_STATUS = 8; // get bit status.
+	    
+    // bit responses
+    RESP_FRAGMENT_HANDLE = 9;
+    RESP_FRAGMENT_STATUS = 10;
+	RESP_BIT_STATUS = 11;
+	RESP_BATCH_CHUNK = 12;
+}
+
+
+message BitColumnData {
+
+    enum ColumnEncoding {
+      PROTOBUF = 0;
+    }
+    
+	message BitColumn {
+		optional int32 field = 1;
+		optional int32 length = 2;
+		optional ColumnEncoding mode = 3;
+	}	
+	
+	optional SchemaDef schema = 1;
+	optional int32 record_count = 2;
+	optional int32 total_size = 3;
+	repeated BitColumn column = 4;
+	
+}
+
+
+message BitHandshake{
+	optional DrillbitEndpoint endpoint = 1;
+}
+
+message BitBatchChunk {}
+message BitStatus {}
+message FragmentStatus {}
+message RecordBatchHeader {}
+message PlanFragment {}
+
+message FragmentHandle {
+	optional int64 fragment_id = 1;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
new file mode 100644
index 0000000..ebc7dca
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
@@ -0,0 +1,35 @@
+package exec.rpc;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "GeneralRPCProtos";
+option optimize_for = LITE_RUNTIME;
+
+message Ack{
+	optional bool ok = 1;
+}
+
+enum RpcMode {
+  REQUEST = 0;
+  RESPONSE = 1;
+  RESPONSE_FAILURE = 2;
+}
+
+message RpcHeader{
+	optional RpcMode mode = 1; 
+	optional int32 coordination_id = 2; // reusable coordination identifier.  Sender defines.  Server returns on return.  Irrelevant for purely single direction rpc.
+	optional int32 rpc_type = 3; // a rpc mode specific rpc type.
+}
+
+message CompleteRpcMessage {
+    optional RpcHeader header = 1; // required
+    optional bytes protobuf_body = 2; // required
+    optional bytes raw_body = 3; // optional
+}
+
+// Class to be used when an unexpected exception occurs while a rpc call is being evaluated.
+message RpcFailure {
+  optional int64 error_id = 1; // for server trackback.
+  optional int32 error_code = 2; // system defined error code.
+  optional string short_error = 3;
+  optional string long_error = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
new file mode 100644
index 0000000..44e2df9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
@@ -0,0 +1,37 @@
+package exec;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "SchemaDefProtos";
+option optimize_for = LITE_RUNTIME;
+
+
+// Schema Definitions //
+enum DataType {
+  LATE = 0;
+  INT32 = 1;
+  INT64 = 2;
+  FLOAT32 = 3;
+  FLOAT64 = 4;
+  UTF8 = 5;
+  BYTES = 6;
+}
+
+enum DataMode {
+  REQUIRED = 0;
+  OPTIONAL = 1;
+  REPEATED = 2;
+  MAP = 3; 
+}
+
+message SchemaDef {
+  repeated FieldDef field = 1;
+}
+
+message FieldDef {
+  optional string name = 1;
+  optional DataMode mode = 2;
+  
+  // If DataMode == 0-2, type should be populated and fields should be empty.  Otherwise, type should empty and fields should be defined. 
+  optional DataType type = 3;
+  repeated FieldDef fields = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
new file mode 100644
index 0000000..225d1a0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
@@ -0,0 +1,93 @@
+package exec.user;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "UserProtos";
+option optimize_for = LITE_RUNTIME;
+import "SchemaDef.proto";
+
+////// UserToBit RPC ///////
+enum RpcType {
+    HANDSHAKE = 0;
+    ACK = 1;
+    GOODBYE = 2;
+    
+    // user to bit
+    RUN_QUERY = 3;
+    REQUEST_RESULTS = 4;
+    
+    // bit to user
+	QUERY_RESULT = 6;
+	QUERY_HANDLE = 7;
+}
+
+message UserToBitHandshake {
+    optional bool support_listening = 1;
+    optional int32 rpc_version = 2;
+}
+
+message RequestResults {
+  optional int64 query_id = 1;
+  optional int32 maximum_responses = 2;
+}
+
+message RunQuery {
+  optional QueryResultsMode mode = 1;
+  optional string plan = 2;
+}
+
+enum QueryResultsMode {
+	STREAM_FULL = 1; // Server will inform the client regularly on the status of the query. Once the query is completed, service will inform the client as each query chunk is made available.
+	STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query.  Once the query is completed, server will inform the client of the first query chunk.
+	QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
+}
+
+
+message BitToUserHandshake {
+	optional int32 rpc_version = 1;
+}
+
+message QueryHandle {
+  	optional int64 query_id = 1;
+}
+
+message NodeStatus {
+	optional int32 node_id = 1;
+	optional int64 memory_footprint = 2;
+}
+
+message QueryResult {
+	enum Outcome {
+	  RUNNING = 0;
+	  FAILED = 1;
+	  COMPLETED = 2;
+	  WAITING = 3;
+	}
+	
+	optional Outcome outcome = 1;
+	optional SchemaDef schema = 2;
+	optional bool is_last_chunk = 3;
+	optional int32 row_count = 4;
+	optional int64 records_scan = 5;
+	optional int64 records_error = 6;
+	optional int64 submission_time = 7;
+	repeated NodeStatus node_status = 8;	
+	repeated Error error = 9;
+}
+
+message TextErrorLocation{
+    optional int32 start_column = 2;
+    optional int32 start_row = 3;
+    optional int32 end_column = 4;
+    optional int32 end_row = 5;
+}
+
+message Error{
+    optional int64 error_id = 1; // for debug tracing purposes
+    optional string host = 2;
+    optional int32 error_type = 3; 
+    optional string message = 4;
+    optional TextErrorLocation error = 5; //optional, used when providing location of error within a piece of text.
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..ad18d6e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -0,0 +1,28 @@
+//  This file tells Drill to consider this module when class path scanning.  
+//  This file can also include any supplementary configuration information.  
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.exec: {
+  cluster-id: "drillbits1"
+  rpc: {
+  	user.port : 31010,
+  	bit.port : 31011
+  },
+  optimizer: {
+    implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+  },
+  
+  zk: {
+	connect: "localhost:2181",
+	root: "/drill",
+	refresh: 500,
+	timeout: 1000,
+	retry: {
+	  count: 7200,
+	  delay: 500
+	}    
+  }
+
+  network: {
+    start: 35000
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java b/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java
new file mode 100644
index 0000000..7f03dfa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java
@@ -0,0 +1,38 @@
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class BBOutputStream extends OutputStream{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BBOutputStream.class);
+
+  private ByteBuffer buf;
+  
+  public BBOutputStream(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buf.put((byte) b);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java b/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java
new file mode 100644
index 0000000..4fb67ed
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java
@@ -0,0 +1,46 @@
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class CompressingBytesColumn {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompressingBytesColumn.class);
+
+  int mb = 1024*1024;
+  
+  ByteBuffer values = ByteBuffer.allocateDirect(20*mb);
+  ByteBuffer fromCompressBuffer = ByteBuffer.allocateDirect(mb);
+  ByteBuffer toCompressBuffer = ByteBuffer.allocateDirect(mb);
+
+  
+  public CompressingBytesColumn(){
+  }
+  
+  public void add(byte[] bytes, int start, int length){
+    
+  }
+  
+  public void add(ByteBuffer buffer){
+    
+  }
+  public void write(OutputStream stream){
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java b/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java
new file mode 100644
index 0000000..c6233ae
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java
@@ -0,0 +1,21 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class ExternalSort {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSort.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java b/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java
new file mode 100644
index 0000000..dca7d27
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java
@@ -0,0 +1,124 @@
+import java.io.BufferedReader;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+
+import org.apache.hadoop.conf.Configuration;
+import org.xerial.snappy.Snappy;
+
+import com.google.common.base.Charsets;
+import com.google.protobuf.CodedOutputStream;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class GenerateExternalSortData {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenerateExternalSortData.class);
+  
+  /** Convert sequence file in to compressed columnar format. 
+   * 
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception{
+    int mb = 1024*1024;
+    final int blockSize = 1024;
+    ByteBuffer keys = ByteBuffer.allocateDirect(2*mb);
+    ByteBuffer values = ByteBuffer.allocateDirect(20*mb);
+    ByteBuffer fromCompressBuffer = ByteBuffer.allocateDirect(mb);
+    ByteBuffer toCompressBuffer = ByteBuffer.allocateDirect(mb);
+    
+    ByteBuffer valueLengthB = ByteBuffer.allocateDirect(blockSize*4);
+    IntBuffer valueLengths = valueLengthB.asIntBuffer();
+    //Opaque value stored as len,data.
+    
+    //
+    //Snappy.compress(uncompressed, compressed);
+    String file = "/opt/data/tera1gb/part-00000";
+    Configuration config = new Configuration();
+    //SequenceFile.Reader sf = new SequenceFile.Reader(FileSystem.getLocal(config), new Path(file), config);
+    
+    BufferedReader reader = Files.newBufferedReader(FileSystems.getDefault().getPath(file), Charsets.UTF_8);
+    
+    CodedOutputStream cos = CodedOutputStream.newInstance(new BBOutputStream(values));
+    
+    long originalBytes = 0;
+    long compressedBytes = 0;
+    String l;
+    int round = 0;
+    long nanos = 0;
+    long x1 = System.nanoTime();
+    while((l = reader.readLine()) != null){
+      
+      byte[] bytes = l.getBytes();
+      keys.put(bytes, 0, 10);
+      int len = bytes.length - 10;
+      originalBytes += len;
+      
+      
+      // Compress the value.
+      long n1 = System.nanoTime();
+      fromCompressBuffer.put(bytes, 10, len);
+      fromCompressBuffer.flip();
+      int newLen = Snappy.compress(fromCompressBuffer, toCompressBuffer);
+      cos.writeRawVarint32(newLen);
+      toCompressBuffer.flip();
+      values.put(toCompressBuffer);
+      fromCompressBuffer.clear();
+      toCompressBuffer.clear();
+      nanos += (System.nanoTime() - n1);
+
+      compressedBytes += newLen;
+      //valueLengths.put(newLen);
+      
+      round++;
+      
+      if(round >= blockSize){
+        // flush
+        keys.clear();
+        values.clear();
+        round = 0;
+        
+      }
+      
+      
+    }
+    
+    System.out.println("Uncompressed: " + originalBytes);
+    System.out.println("Compressed: " + compressedBytes);
+    System.out.println("CompressionTime: " + nanos/1000/1000);
+    System.out.println("Total Time: " + (System.nanoTime() - x1)/1000/1000);
+    
+  }
+  
+  private static void convertToDeltas(IntBuffer b){
+    b.flip();
+    int min = Integer.MAX_VALUE;
+    for(int i =0; i < b.limit(); i++){
+      min = Math.min(b.get(i), min);
+    }
+    
+    for(int i =0; i < b.limit(); i++){
+      int cur = b.get(i);
+      b.put(i, cur - min);
+    }
+    
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java
new file mode 100644
index 0000000..e3747e1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.column;
+
+import org.junit.Test;
+
+public class SimpleExec {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleExec.class);
+  
+  @Test
+  public void columnarAnd() throws Exception{
+   
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java
new file mode 100644
index 0000000..66f69de
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java
@@ -0,0 +1,361 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.util.BitSet;
+import java.util.Random;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class TestOpenBitSet {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOpenBitSet.class);
+
+  Random random = new Random();
+  ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
+  
+  public int atLeast(int val){
+    return val + random.nextInt(val);
+  }
+  
+  
+  public Random random() {
+    return new Random();
+  }
+
+  void doGet(BitSet a, BufBitSet b) {
+    int max = a.size();
+    for (int i = 0; i < max; i++) {
+      if (a.get(i) != b.get(i)) {
+        fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+      }
+      if (a.get(i) != b.get((long) i)) {
+        fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+      }
+    }
+  }
+
+  void doGetFast(BitSet a, BufBitSet b, int max) {
+    for (int i = 0; i < max; i++) {
+      if (a.get(i) != b.fastGet(i)) {
+        fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+      }
+      if (a.get(i) != b.fastGet((long) i)) {
+        fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+      }
+    }
+  }
+
+  void doNextSetBit(BitSet a, BufBitSet b) {
+    int aa = -1, bb = -1;
+    do {
+      aa = a.nextSetBit(aa + 1);
+      bb = b.nextSetBit(bb + 1);
+      assertEquals(aa, bb);
+    } while (aa >= 0);
+  }
+
+  void doNextSetBitLong(BitSet a, BufBitSet b) {
+    int aa = -1, bb = -1;
+    do {
+      aa = a.nextSetBit(aa + 1);
+      bb = (int) b.nextSetBit((long) (bb + 1));
+      assertEquals(aa, bb);
+    } while (aa >= 0);
+  }
+
+  void doPrevSetBit(BitSet a, BufBitSet b) {
+    int aa = a.size() + random().nextInt(100);
+    int bb = aa;
+    do {
+      // aa = a.prevSetBit(aa-1);
+      aa--;
+      while ((aa >= 0) && (!a.get(aa))) {
+        aa--;
+      }
+      bb = b.prevSetBit(bb - 1);
+      assertEquals(aa, bb);
+    } while (aa >= 0);
+  }
+
+  void doPrevSetBitLong(BitSet a, BufBitSet b) {
+    int aa = a.size() + random().nextInt(100);
+    int bb = aa;
+    do {
+      // aa = a.prevSetBit(aa-1);
+      aa--;
+      while ((aa >= 0) && (!a.get(aa))) {
+        aa--;
+      }
+      bb = (int) b.prevSetBit((long) (bb - 1));
+      assertEquals(aa, bb);
+    } while (aa >= 0);
+  }
+
+  // test interleaving different OpenBitSetIterator.next()/skipTo()
+  void doIterate(BitSet a, BufBitSet b, int mode) {
+    // if (mode == 1) doIterate1(a, b);
+    // if (mode == 2) doIterate2(a, b);
+  }
+
+  //
+  // void doIterate1(BitSet a, OpenBitSet b) {
+  // int aa = -1, bb = -1;
+  // OpenBitSetIterator iterator = new OpenBitSetIterator(b);
+  // do {
+  // aa = a.nextSetBit(aa + 1);
+  // bb = random().nextBoolean() ? iterator.nextDoc() : iterator.advance(bb + 1);
+  // assertEquals(aa == -1 ? DocIdSetIterator.NO_MORE_DOCS : aa, bb);
+  // } while (aa >= 0);
+  // }
+  //
+  // void doIterate2(BitSet a, OpenBitSet b) {
+  // int aa = -1, bb = -1;
+  // OpenBitSetIterator iterator = new OpenBitSetIterator(b);
+  // do {
+  // aa = a.nextSetBit(aa + 1);
+  // bb = random().nextBoolean() ? iterator.nextDoc() : iterator.advance(bb + 1);
+  // assertEquals(aa == -1 ? DocIdSetIterator.NO_MORE_DOCS : aa, bb);
+  // } while (aa >= 0);
+  // }
+
+  void doRandomSets(int maxSize, int iter, int mode) {
+    BitSet a0 = null;
+    BufBitSet b0 = null;
+
+    for (int i = 0; i < iter; i++) {
+      int sz = random().nextInt(maxSize);
+      BitSet a = new BitSet(sz);
+      BufBitSet b = new BufBitSet(sz, allocator);
+
+      // test the various ways of setting bits
+      if (sz > 0) {
+        int nOper = random().nextInt(sz);
+        for (int j = 0; j < nOper; j++) {
+          int idx;
+
+          idx = random().nextInt(sz);
+          a.set(idx);
+          b.fastSet(idx);
+
+          idx = random().nextInt(sz);
+          a.set(idx);
+          b.fastSet((long) idx);
+
+          idx = random().nextInt(sz);
+          a.clear(idx);
+          b.fastClear(idx);
+
+          idx = random().nextInt(sz);
+          a.clear(idx);
+          b.fastClear((long) idx);
+
+          idx = random().nextInt(sz);
+          a.flip(idx);
+          b.fastFlip(idx);
+
+          boolean val = b.flipAndGet(idx);
+          boolean val2 = b.flipAndGet(idx);
+          assertTrue(val != val2);
+
+          idx = random().nextInt(sz);
+          a.flip(idx);
+          b.fastFlip((long) idx);
+
+          val = b.flipAndGet((long) idx);
+          val2 = b.flipAndGet((long) idx);
+          assertTrue(val != val2);
+
+          val = b.getAndSet(idx);
+          assertTrue(val2 == val);
+          assertTrue(b.get(idx));
+
+          if (!val) b.fastClear(idx);
+          assertTrue(b.get(idx) == val);
+        }
+      }
+
+      // test that the various ways of accessing the bits are equivalent
+      doGet(a, b);
+      doGetFast(a, b, sz);
+
+      // test ranges, including possible extension
+      int fromIndex, toIndex;
+      fromIndex = random().nextInt(sz + 80);
+      toIndex = fromIndex + random().nextInt((sz >> 1) + 1);
+      BitSet aa = (BitSet) a.clone();
+      aa.flip(fromIndex, toIndex);
+      BufBitSet bb = b.cloneTest();
+      bb.flip(fromIndex, toIndex);
+
+      doIterate(aa, bb, mode); // a problem here is from flip or doIterate
+
+      fromIndex = random().nextInt(sz + 80);
+      toIndex = fromIndex + random().nextInt((sz >> 1) + 1);
+      aa = (BitSet) a.clone();
+      aa.clear(fromIndex, toIndex);
+      bb = b.cloneTest();
+      bb.clear(fromIndex, toIndex);
+
+      doNextSetBit(aa, bb); // a problem here is from clear() or nextSetBit
+      doNextSetBitLong(aa, bb);
+
+      doPrevSetBit(aa, bb);
+      doPrevSetBitLong(aa, bb);
+
+      fromIndex = random().nextInt(sz + 80);
+      toIndex = fromIndex + random().nextInt((sz >> 1) + 1);
+      aa = (BitSet) a.clone();
+      aa.set(fromIndex, toIndex);
+      bb = b.cloneTest();
+      bb.set(fromIndex, toIndex);
+
+      doNextSetBit(aa, bb); // a problem here is from set() or nextSetBit
+      doNextSetBitLong(aa, bb);
+
+      doPrevSetBit(aa, bb);
+      doPrevSetBitLong(aa, bb);
+
+      if (a0 != null) {
+        assertEquals(a.equals(a0), b.equals(b0));
+
+        assertEquals(a.cardinality(), b.cardinality());
+
+        BitSet a_and = (BitSet) a.clone();
+        a_and.and(a0);
+        BitSet a_or = (BitSet) a.clone();
+        a_or.or(a0);
+        BitSet a_xor = (BitSet) a.clone();
+        a_xor.xor(a0);
+        BitSet a_andn = (BitSet) a.clone();
+        a_andn.andNot(a0);
+
+        BufBitSet b_and = b.cloneTest();
+        assertEquals(b, b_and);
+        b_and.and(b0);
+        BufBitSet b_or = b.cloneTest();
+        b_or.or(b0);
+        BufBitSet b_xor = b.cloneTest();
+        b_xor.xor(b0);
+        BufBitSet b_andn = b.cloneTest();
+        b_andn.andNot(b0);
+
+        doIterate(a_and, b_and, mode);
+        doIterate(a_or, b_or, mode);
+        doIterate(a_xor, b_xor, mode);
+        doIterate(a_andn, b_andn, mode);
+
+        assertEquals(a_and.cardinality(), b_and.cardinality());
+        assertEquals(a_or.cardinality(), b_or.cardinality());
+        assertEquals(a_xor.cardinality(), b_xor.cardinality());
+        assertEquals(a_andn.cardinality(), b_andn.cardinality());
+
+        // test non-mutating popcounts
+        assertEquals(b_and.cardinality(), BufBitSet.intersectionCount(b, b0));
+        assertEquals(b_or.cardinality(), BufBitSet.unionCount(b, b0));
+        assertEquals(b_xor.cardinality(), BufBitSet.xorCount(b, b0));
+        assertEquals(b_andn.cardinality(), BufBitSet.andNotCount(b, b0));
+      }
+
+      a0 = a;
+      b0 = b;
+    }
+  }
+
+  // large enough to flush obvious bugs, small enough to run in <.5 sec as part of a
+  // larger testsuite.
+  @Test
+  public void testSmall() {
+    doRandomSets(atLeast(1200), atLeast(1000), 1);
+    doRandomSets(atLeast(1200), atLeast(1000), 2);
+  }
+
+  // uncomment to run a bigger test (~2 minutes).
+  /*
+   * public void testBig() { doRandomSets(2000,200000, 1); doRandomSets(2000,200000, 2); }
+   */
+
+  @Test
+  public void testEquals() {
+    BufBitSet b1 = new BufBitSet(1111, allocator);
+    BufBitSet b2 = new BufBitSet(2222, allocator);
+    assertTrue(b1.equals(b2));
+    assertTrue(b2.equals(b1));
+    b1.set(10);
+    assertFalse(b1.equals(b2));
+    assertFalse(b2.equals(b1));
+    b2.set(10);
+    assertTrue(b1.equals(b2));
+    assertTrue(b2.equals(b1));
+    b2.set(2221);
+    assertFalse(b1.equals(b2));
+    assertFalse(b2.equals(b1));
+    b1.set(2221);
+    assertTrue(b1.equals(b2));
+    assertTrue(b2.equals(b1));
+
+    // try different type of object
+    assertFalse(b1.equals(new Object()));
+  }
+
+  @Test
+  public void testHashCodeEquals() {
+    BufBitSet bs1 = new BufBitSet(200, allocator);
+    BufBitSet bs2 = new BufBitSet(64, allocator);
+    bs1.set(3);
+    bs2.set(3);
+    assertEquals(bs1, bs2);
+    assertEquals(bs1.hashCode(), bs2.hashCode());
+  }
+
+  private BufBitSet makeOpenBitSet(int[] a) {
+    BufBitSet bs = new BufBitSet(64, allocator);
+    for (int e : a) {
+      bs.set(e);
+    }
+    return bs;
+  }
+
+  private BitSet makeBitSet(int[] a) {
+    BitSet bs = new BitSet();
+    for (int e : a) {
+      bs.set(e);
+    }
+    return bs;
+  }
+
+  private void checkPrevSetBitArray(int[] a) {
+    BufBitSet obs = makeOpenBitSet(a);
+    BitSet bs = makeBitSet(a);
+    doPrevSetBit(bs, obs);
+  }
+
+  public void testPrevSetBit() {
+    checkPrevSetBitArray(new int[] {});
+    checkPrevSetBitArray(new int[] { 0 });
+    checkPrevSetBitArray(new int[] { 0, 2 });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
new file mode 100644
index 0000000..c8ce877
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
@@ -0,0 +1,107 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.junit.Test;
+
+public class UserRpcTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcTest.class);
+  
+  
+  
+  
+  @Test
+  public void doBasicRpcTest() throws Exception {
+    final int bufferSize = 25000;
+    final int batchSize = 1000;
+    final int batchCount = 100;
+
+    
+    int sends = 0;
+    int receives = 0;
+    long nanoSend = 0;
+    long nanoReceive = 0;
+
+    
+    try {
+      ByteBufAllocator bb = new PooledByteBufAllocator(true);
+//      ByteBufAllocator bb = UnpooledByteBufAllocator.DEFAULT;
+      UserServer s = new UserServer(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Server-")), null);
+      s.bind(31515);
+
+      logger.debug("Starting user client.");
+      UserClient c = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+
+      logger.debug("Connecting as client to server.");
+      c.connectAsClient("localhost", 31515);
+
+      
+      @SuppressWarnings("unchecked")
+      DrillRpcFuture<QueryHandle>[] handles = new DrillRpcFuture[batchSize];
+
+      for (int x = 0; x < batchCount; x++) {
+        long s1 = System.nanoTime();
+        for (int i = 0; i < batchSize; i++) {
+          sends++;
+          ByteBuf rawBody = bb.buffer(bufferSize);
+          rawBody.writerIndex(bufferSize);
+          if(rawBody.readableBytes() != bufferSize) throw new RuntimeException();
+          handles[i] = c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build(), rawBody);
+        }
+        
+        long s2 = System.nanoTime();
+
+        for (int i = 0; i < batchSize; i++) {
+          handles[i].checkedGet(2, TimeUnit.SECONDS).getQueryId();
+          receives++;
+        }
+
+        long s3 = System.nanoTime();
+        nanoSend += (s2-s1);
+        nanoReceive += (s3-s2);
+        logger.debug("Submission time {}ms, return time {}ms", (s2 - s1) / 1000 / 1000, (s3 - s2) / 1000 / 1000);
+      }
+      // logger.debug("Submitting query.");
+      // DrillRpcFuture<QueryHandle> handleFuture =
+      // c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build());
+      //
+      // logger.debug("Got query id handle of {}", handleFuture.get(2, TimeUnit.SECONDS).getQueryId());
+    } catch (Exception e) {
+      logger.error("Exception of type {} occurred while doing test.", e.getClass().getCanonicalName());
+      throw e;
+    } finally{
+      long mbsTransferred = (1l * bufferSize * batchSize * batchCount)/1024/1024;
+      double sSend = nanoSend*1.0d/1000/1000/1000;
+      double sReceive = nanoReceive*1.0d/1000/1000/1000;
+      logger.info(String.format("Completed %d sends and %d receives.  Total data transferred was %d.  Send bw: %f, Receive bw: %f.", sends, receives, mbsTransferred, mbsTransferred*1.0/sSend, mbsTransferred*1.0/sReceive));
+      logger.info("Completed {} sends and {} receives.", sends, receives);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java
new file mode 100644
index 0000000..7b353df
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.junit.Test;
+
+public class StartDrillbit {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StartDrillbit.class);
+  
+  
+  @Test public void startDrillbit() throws DrillbitStartupException, InterruptedException{
+    Drillbit.main(new String[0]);
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
new file mode 100644
index 0000000..b79b811
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<configuration>
+
+  <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+    <Compressing>true</Compressing> 
+    <ReconnectionDelay>10000</ReconnectionDelay>
+    <IncludeCallerData>true</IncludeCallerData>
+    <RemoteHosts>localhost</RemoteHosts>
+  </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+<!-- 
+  <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <file>/logs/test-common.log</file>
+    <encoder>
+      <pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
+    </encoder>
+    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+	    <fileNamePattern>/logs/test-common.%d{yyyy-MM-dd}.log</fileNamePattern>
+	    <maxHistory>30</maxHistory>
+    </rollingPolicy>
+  </appender>
+  --> 
+  <logger name="org.apache.drill" additivity="false">
+    <level value="debug" />
+    <appender-ref ref="SOCKET" />
+    <appender-ref ref="STDOUT" />
+<!--     <appender-ref ref="FILE" /> -->
+  </logger>
+
+  <root>
+    <level value="error" />
+    <appender-ref ref="SOCKET" />
+    <appender-ref ref="STDOUT" />
+<!--     <appender-ref ref="FILE" /> -->
+  </root>
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/ref/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/pom.xml b/sandbox/prototype/exec/ref/pom.xml
index b253f6b..045a36b 100644
--- a/sandbox/prototype/exec/ref/pom.xml
+++ b/sandbox/prototype/exec/ref/pom.xml
@@ -8,9 +8,9 @@
 		<groupId>org.apache.drill.exec</groupId>
 		<version>1.0-SNAPSHOT</version>
 	</parent>
-	
+
 	<artifactId>ref</artifactId>
-	
+
 	<name>Logical Plan Execution Reference Implementation</name>
 
 	<dependencies>
@@ -36,8 +36,9 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
-		
 
+
+		
 		<dependency>
 			<groupId>com.carrotsearch</groupId>
 			<artifactId>hppc</artifactId>