You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:10 UTC

[6/9] basic framework for physical plan. abstraction of graph classes.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
new file mode 100644
index 0000000..97a9b3b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.Closeable;
+
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * A vector of values.  Acts a containing instance that may rotate its internal buffer depending on what it needs to hold.  Should be language agnostic so that it can be passed between Java and JNI without modification.
+ */
+public interface ValueVector<T extends ValueVector<T>> extends Closeable {
+
+  /**
+   * Copies the data from this vector into its pair.
+   * 
+   * @param vector
+   */
+  public abstract void cloneInto(T vector);
+
+  /**
+   * Allocate a new memory space for this vector.
+   * 
+   * @param valueCount
+   *          The number of possible values which should be contained in this vector.
+   */
+  public abstract void allocateNew(int valueCount);
+
+  /**
+   * Zero copy move of data from this vector to the target vector. Any future access to this vector without being
+   * populated by a new vector will cause problems.
+   * 
+   * @param vector
+   */
+  public abstract void transferTo(T vector);
+
+  /**
+   * Return the underlying buffer. Note that this doesn't impact the reference counts for this buffer so it only should be
+   * used for in context access. Also note that this buffer changes regularly thus external classes shouldn't hold a
+   * reference to it.
+   * 
+   * @return The underlying ByteBuf.
+   */
+  public abstract ByteBuf getBuffer();
+
+  /**
+   * Returns the number of value contained within this vector.
+   * @return Vector size
+   */
+  public abstract int size();
+
+  /**
+   * Release supporting resources.
+   */
+  public abstract void close();
+
+  /**
+   * Get information about how this field is materialized.
+   * 
+   * @return
+   */
+  public abstract MaterializedField getField();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
new file mode 100644
index 0000000..bae45dc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.record.DeadBuf;
+
+/** 
+ * A vector of variable length bytes.  Constructed as a vector of lengths or positions and a vector of values.  Random access is only possible if the variable vector stores positions as opposed to lengths.
+ */
+public abstract class VariableVector<T extends VariableVector<T, E>, E extends BaseValueVector<E>> extends BaseValueVector<T>{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VariableVector.class);
+  
+  protected E lengthVector;
+  private ByteBuf values = DeadBuf.DEAD_BUFFER;
+  protected int expectedValueLength;
+  private final boolean hasPositions;
+  
+  public VariableVector(int fieldId, BufferAllocator allocator, boolean hasPositions) {
+    super(fieldId, allocator);
+    this.lengthVector = getNewLengthVector(fieldId, allocator);
+    this.hasPositions = hasPositions;
+  }
+  
+  protected abstract E getNewLengthVector(int fieldId, BufferAllocator allocator);
+  
+  @Override
+  protected int getAllocationSize(int valueCount) {
+    return lengthVector.getAllocationSize(valueCount) + (expectedValueLength * valueCount);
+  }
+  
+  @Override
+  protected void childResetAllocation(int valueCount, ByteBuf buf) {
+    int firstSize = lengthVector.getAllocationSize(valueCount);
+    lengthVector.resetAllocation(valueCount, buf.slice(0, firstSize));
+    values = buf.slice(firstSize, expectedValueLength * valueCount);
+  }
+
+  @Override
+  protected void childCloneMetadata(T other) {
+    lengthVector.cloneMetadata(other.lengthVector);
+    other.expectedValueLength = expectedValueLength;
+  }
+
+  @Override
+  protected void childClear() {
+    lengthVector.clear();
+    if(values != DeadBuf.DEAD_BUFFER){
+      values.release();
+      values = DeadBuf.DEAD_BUFFER;
+    }
+  }  
+  
+  public boolean hasPositions(){
+    return hasPositions;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
new file mode 100644
index 0000000..aa42fc1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public abstract class BasicClient<T extends Enum<T>> extends RpcBus<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
+
+  private Bootstrap b;
+  private volatile boolean connect = false;
+
+  public BasicClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+    b = new Bootstrap() //
+        .group(eventLoopGroup) //
+        .channel(NioSocketChannel.class) //
+        .option(ChannelOption.ALLOCATOR, alloc) //
+        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
+        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+        .handler(new ChannelInitializer<SocketChannel>() {
+          
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            ch.closeFuture().addListener(getCloseHandler(ch));
+            
+            ch.pipeline().addLast( //
+                new ZeroCopyProtobufLengthDecoder(), //
+                new RpcDecoder(), //
+                new RpcEncoder(), //
+                new InboundHandler(ch), //
+                new RpcExceptionHandler() //
+                );
+            channel = ch;
+            connect = true;
+          }
+        }) //
+        
+        ;
+  }
+
+  @Override
+  public boolean isClient() {
+    return true;
+  }
+  
+  public ChannelFuture connectAsClient(String host, int port) throws InterruptedException {
+    ChannelFuture f = b.connect(host, port).sync();
+    connect = !connect;
+    return f;
+  }
+
+  public void close() {
+    logger.debug("Closing client");
+    b.shutdown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
new file mode 100644
index 0000000..acf1822
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+
+/**
+ * A server is bound to a port and is responsible for responding to various type of requests. In some cases, the inbound
+ * requests will generate more than one outbound request.
+ */
+public abstract class BasicServer<T extends Enum<T>> extends RpcBus<T>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicServer.class);
+
+  private ServerBootstrap b;
+  private volatile boolean connect = false;
+
+  public BasicServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+
+    b = new ServerBootstrap() //
+        .channel(NioServerSocketChannel.class) //
+        .option(ChannelOption.SO_BACKLOG, 100) //
+        .option(ChannelOption.SO_RCVBUF, 1 << 17) //
+        .option(ChannelOption.SO_SNDBUF, 1 << 17) //
+        .group(eventLoopGroup) //
+        .childOption(ChannelOption.ALLOCATOR, alloc) //
+        .handler(new LoggingHandler(LogLevel.INFO)) //
+        .childHandler(new ChannelInitializer<SocketChannel>() {
+          @Override
+          protected void initChannel(SocketChannel ch) throws Exception {
+            
+            ch.closeFuture().addListener(getCloseHandler(ch));
+
+            ch.pipeline().addLast( //
+                new ZeroCopyProtobufLengthDecoder(), //
+                new RpcDecoder(), //
+                new RpcEncoder(), //
+                new InboundHandler(ch), //
+                new RpcExceptionHandler() //
+                );            
+            channel = ch;
+            connect = true;
+          }
+        });
+  }
+ 
+  @Override
+  public boolean isClient() {
+    return false;
+  }
+
+
+  public int bind(final int initialPort) throws InterruptedException, DrillbitStartupException{
+    boolean ok = false;
+    int port = initialPort;
+    for(; port < Character.MAX_VALUE; port++){
+      if(b.bind(port).sync().isSuccess()){
+        ok = true;
+        break;
+      }
+    }
+    if(!ok){
+      throw new DrillbitStartupException(String.format("Unable to find available port for Drillbit server starting at port %d.", initialPort));
+    }
+    
+    connect = !connect;
+    return port;    
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(b != null) b.shutdown();
+  }
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java
new file mode 100644
index 0000000..e80292f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelClosedException.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public class ChannelClosedException extends RpcException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChannelClosedException.class);
+
+  public ChannelClosedException() {
+    super();
+  }
+
+  public ChannelClosedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ChannelClosedException(String message) {
+    super(message);
+  }
+
+  public ChannelClosedException(Throwable cause) {
+    super(cause);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
new file mode 100644
index 0000000..a924359
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -0,0 +1,87 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Manages the creation of rpc futures for a particular socket.
+ */
+public class CoordinationQueue{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
+  
+  private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
+  private final Map<Integer, DrillRpcFuture<?>> map;
+  
+  
+  public CoordinationQueue(int segmentSize, int segmentCount){
+    map = new ConcurrentHashMap<Integer, DrillRpcFuture<?>>(segmentSize, 0.75f, segmentCount);
+  }
+  
+  void channelClosed(Exception ex){
+    for(DrillRpcFuture<?> f : map.values()){
+      f.setException(ex);
+    }
+  }
+  
+  public <V> DrillRpcFuture<V> getNewFuture(Class<V> clazz){
+    int i = circularInt.getNext();
+    DrillRpcFuture<V> future = DrillRpcFuture.getNewFuture(i, clazz);
+//    logger.debug("Writing to map coord {}, future {}", i, future);
+    Object old = map.put(i,  future);
+    if(old != null) throw new IllegalStateException("You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
+    return future;
+  }
+
+  private DrillRpcFuture<?> removeFromMap(int coordinationId){
+    DrillRpcFuture<?> rpc = map.remove(coordinationId);
+    if(rpc == null){
+      logger.error("Rpc is null.");
+      throw new IllegalStateException("Attempting to retrieve an rpc that wasn't first stored in the rpc coordination queue.  This would most likely happen if you're opposite endpoint sent the multiple messages on the same coordination id.");
+    }
+    return rpc;
+  }
+  
+  public <V> DrillRpcFuture<V> getFuture(int coordinationId, Class<V> clazz){
+//    logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
+    DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
+//    logger.debug("Got rpc from map {}", rpc);
+    
+    if(rpc.clazz != clazz){
+      logger.error("Rpc class is not expected class {}", rpc.clazz, clazz);
+      throw new IllegalStateException("You attempted to request a future for a coordination id that has a different value class than was used when you initially created the coordination id.  This shouldn't happen.");
+    }
+    
+    @SuppressWarnings("unchecked")
+    DrillRpcFuture<V> crpc = (DrillRpcFuture<V>) rpc; 
+    
+//    logger.debug("Returning casted future");
+    return crpc;
+  }
+  
+  public void updateFailedFuture(int coordinationId, RpcFailure failure){
+//    logger.debug("Updating failed future.");
+    DrillRpcFuture<?> rpc = removeFromMap(coordinationId);
+    rpc.setException(new RemoteRpcException(failure));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
new file mode 100644
index 0000000..5a2fd93
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DrillRpcFuture<V> extends AbstractCheckedFuture<V, RpcException> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
+
+  final int coordinationId;
+  final Class<V> clazz;
+
+  public DrillRpcFuture(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
+    super(delegate);
+    this.coordinationId = coordinationId;
+    this.clazz = clazz;
+  }
+
+  /**
+   * Drill doesn't currently support rpc cancellations since nearly all requests should be either instance of
+   * asynchronous. Business level cancellation is managed a separate call (e.g. canceling a query.). Calling this method
+   * will result in an UnsupportedOperationException.
+   */
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    throw new UnsupportedOperationException(
+        "Drill doesn't currently support rpc cancellations. See javadocs for more detail.");
+  }
+
+  @Override
+  protected RpcException mapException(Exception ex) {
+    if (ex instanceof RpcException)  return (RpcException) ex;
+    
+    if (ex instanceof ExecutionException) {
+      Throwable e2 = ex.getCause();
+      
+      if (e2 instanceof RpcException) {
+        return (RpcException) e2;
+      }
+    }
+    return new RpcException(ex);
+
+  }
+
+  @SuppressWarnings("unchecked")
+  void setValue(Object value) {
+    assert clazz.isAssignableFrom(value.getClass());
+    ((InnerFuture<V>) super.delegate()).setValue((V) value);
+  }
+
+  boolean setException(Throwable t) {
+    return ((InnerFuture<V>) super.delegate()).setException(t);
+  }
+
+  public static class InnerFuture<T> extends AbstractFuture<T> {
+    // we rewrite these so that the parent can see them
+
+    void setValue(T value) {
+      super.set(value);
+    }
+
+    protected boolean setException(Throwable t) {
+      return super.setException(t);
+    }
+  }
+
+  public static <V> DrillRpcFuture<V> getNewFuture(int coordinationId, Class<V> clazz) {
+    InnerFuture<V> f = new InnerFuture<V>();
+    return new DrillRpcFuture<V>(f, coordinationId, clazz);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
new file mode 100644
index 0000000..ab977db
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+public class InboundRpcMessage extends RpcMessage{
+  public ByteBuf pBody;
+  
+  public InboundRpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf pBody, ByteBuf dBody) {
+    super(mode, rpcType, coordinationId, dBody);
+    this.pBody = pBody;
+  }
+  
+  public int getBodySize(){
+    int len = pBody.capacity();
+    if(dBody != null) len += dBody.capacity();
+    return len;
+  }
+  
+  void release(){
+    pBody.release();
+    super.release();
+  }
+
+  @Override
+  public String toString() {
+    return "InboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
+        + coordinationId + ", dBody=" + dBody + "]";
+  }
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
new file mode 100644
index 0000000..0df7719
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NamedThreadFactory implements ThreadFactory {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class);
+  private final AtomicInteger nextId = new AtomicInteger();
+  private final String prefix;
+
+  public NamedThreadFactory(String prefix) {
+    this.prefix = prefix;
+  }
+
+  @Override
+  public Thread newThread(Runnable r) {
+    Thread t = new Thread(r, prefix + nextId.incrementAndGet());
+    try {
+      if (t.isDaemon()) {
+        t.setDaemon(true);
+      }
+      if (t.getPriority() != Thread.MAX_PRIORITY) {
+        t.setPriority(Thread.MAX_PRIORITY);
+      }
+    } catch (Exception ignored) {
+      // Doesn't matter even if failed to set.
+    }
+    return t;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
new file mode 100644
index 0000000..bb7644e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutboundRpcMessage.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+import com.google.protobuf.MessageLite;
+
+class OutboundRpcMessage extends RpcMessage{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundRpcMessage.class);
+
+  final MessageLite pBody;
+  
+  public OutboundRpcMessage(RpcMode mode, Enum<?> rpcType, int coordinationId, MessageLite pBody, ByteBuf dBody) {
+    super(mode, rpcType.ordinal(), coordinationId, dBody);
+    this.pBody = pBody;
+  }
+  
+  public int getBodySize(){
+    int len = pBody.getSerializedSize();
+    len += RpcEncoder.getRawVarintSize(len);
+    if(dBody != null) len += dBody.capacity();
+    return len;
+  }
+
+  @Override
+  public String toString() {
+    return "OutboundRpcMessage [pBody=" + pBody + ", mode=" + mode + ", rpcType=" + rpcType + ", coordinationId="
+        + coordinationId + ", dBody=" + dBody + "]";
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
new file mode 100644
index 0000000..7408516
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
@@ -0,0 +1,39 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+ * An atomic integer that only ever returns 0 > MAX_INT and then starts over.  Should never has a negative overflow.
+ */
+public class PositiveAtomicInteger {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PositiveAtomicInteger.class);
+  
+  private final AtomicInteger internal = new AtomicInteger(Integer.MIN_VALUE);
+  
+  public int getNext(){
+    int i = internal.addAndGet(1);
+    if(i < 0){
+      return i + (-Integer.MIN_VALUE);
+    }else{
+      return i;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
new file mode 100644
index 0000000..2a535a7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteRpcException.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
+
+public class RemoteRpcException extends RpcException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteRpcException.class);
+  
+  private final RpcFailure failure;
+
+  public RemoteRpcException(RpcFailure failure) {
+    super(String.format("Failure while executing rpc.  Remote failure message: [%s].  Error Code: [%d].  Remote Error Id: [%d]", failure.getShortError(), failure.getErrorId(), failure.getErrorCode()));
+    this.failure = failure;
+  }
+
+  public RpcFailure getFailure() {
+    return failure;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
new file mode 100644
index 0000000..8a2f48d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import com.google.protobuf.MessageLite;
+
+public class Response {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Response.class);
+  
+  public Enum<?> rpcType;
+  public MessageLite pBody;
+  public ByteBuf dBody;
+  
+  public Response(Enum<?> rpcType, MessageLite pBody, ByteBuf dBody) {
+    super();
+    this.rpcType = rpcType;
+    this.pBody = pBody;
+    this.dBody = dBody;
+  }
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
new file mode 100644
index 0000000..760bd30
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -0,0 +1,172 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
+import java.util.concurrent.CancellationException;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+/**
+ * The Rpc Bus deals with incoming and outgoing communication and is used on both the server and the client side of a system.
+ * @param <T>
+ */
+public abstract class RpcBus<T extends Enum<T>> implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcBus.class);
+  
+  private CoordinationQueue queue = new CoordinationQueue(16, 16);
+  protected Channel channel;
+
+  protected abstract MessageLite getResponseDefaultInstance(int rpcType) throws RpcException;
+  protected abstract Response handle(SocketChannel channel, int RpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException;
+  public abstract boolean isClient(); 
+
+  
+  protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf dataBody) throws RpcException {
+    ByteBuf pBuffer = null;
+    boolean completed = false;
+
+    try {
+//      logger.debug("Seding message");
+      Preconditions.checkNotNull(protobufBody);
+      DrillRpcFuture<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
+      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBody);
+      ChannelFuture channelFuture = channel.write(m);
+      channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
+      completed = true;
+      return rpcFuture;
+    } finally {
+      if (!completed) {
+        if (pBuffer != null) pBuffer.release();
+        if (dataBody != null) dataBody.release();
+      }
+      ;
+    }
+  }
+
+  
+  public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture>{
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      logger.info("Channel closed between local {} and remote {}", future.channel().localAddress(), future.channel().remoteAddress());
+      queue.channelClosed(new ChannelClosedException());
+    }
+  }
+  
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch){
+    return new ChannelClosedHandler();
+  }
+  
+  protected class InboundHandler extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+
+    private final SocketChannel channel;
+    
+    
+    public InboundHandler(SocketChannel channel) {
+      super();
+      this.channel = channel;
+    }
+
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage msg) throws Exception {
+      if(!ctx.channel().isOpen()) return;
+
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
+      switch(msg.mode){
+      case REQUEST:
+        // handle message and ack.
+        Response r = handle(channel, msg.rpcType, msg.pBody, msg.dBody);
+        OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId, r.pBody, r.dBody);
+        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
+        ctx.write(outMessage);
+        break;
+        
+      case RESPONSE:
+        MessageLite m = getResponseDefaultInstance(msg.rpcType);
+        DrillRpcFuture<?> rpcFuture = queue.getFuture(msg.coordinationId, m.getClass());
+        Parser<?> parser = m.getParserForType();
+        Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+        rpcFuture.setValue(value);
+        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
+        break;
+        
+      case RESPONSE_FAILURE:
+        RpcFailure failure = RpcFailure.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
+        queue.updateFailedFuture(msg.coordinationId, failure);
+        if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future with coordinationId {} with failure ", msg.coordinationId, failure);
+        break;
+        
+      default:
+        throw new UnsupportedOperationException(); 
+      }
+    }
+
+  }
+
+  private class Listener implements GenericFutureListener<ChannelFuture> {
+
+    private int coordinationId;
+    private Class<?> clazz;
+
+    public Listener(int coordinationId, Class<?> clazz) {
+      this.coordinationId = coordinationId;
+      this.clazz = clazz;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture channelFuture) throws Exception {
+//      logger.debug("Completed channel write.");
+      
+      if (channelFuture.isCancelled()) {
+        DrillRpcFuture<?> rpcFuture = queue.getFuture(coordinationId, clazz);
+        rpcFuture.setException(new CancellationException("Socket operation was canceled."));
+      } else if (!channelFuture.isSuccess()) {
+        try {
+          channelFuture.get();
+          throw new IllegalStateException(
+              "Future was described as completed and not succesful but did not throw an exception.");
+        } catch (Exception e) {
+          DrillRpcFuture<?> rpcFuture = queue.getFuture(coordinationId, clazz);
+          rpcFuture.setException(e);
+        }
+      } else {
+        // send was successful. No need to modify DrillRpcFuture.
+        return;
+      }
+    }
+
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
new file mode 100644
index 0000000..7753e07
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public class RpcConstants {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConstants.class);
+  
+  private RpcConstants(){}
+  
+  public static final boolean EXTRA_DEBUGGING = false;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
new file mode 100644
index 0000000..134e54b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
@@ -0,0 +1,142 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
+
+/**
+ * Converts a previously length adjusted buffer into an RpcMessage.
+ */
+class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class);
+  
+  private final AtomicLong messageCounter = new AtomicLong();
+  
+  @Override
+  protected InboundRpcMessage decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+    if(!ctx.channel().isOpen()){
+      return null;
+    }
+    
+    if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Inbound rpc message received.");
+
+    // now, we know the entire message is in the buffer and the buffer is constrained to this message. Additionally,
+    // this process should avoid reading beyond the end of this buffer so we inform the ByteBufInputStream to throw an
+    // exception if be go beyond readable bytes (as opposed to blocking).
+    final ByteBufInputStream is = new ByteBufInputStream(buffer, buffer.readableBytes());
+
+    // read the rpc header, saved in delimited format.
+    checkTag(is, RpcEncoder.HEADER_TAG);
+    final RpcHeader header = RpcHeader.parseDelimitedFrom(is);
+    if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Read header. {}", header);
+
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex());
+    
+    // read the protobuf body into a buffer.
+    checkTag(is, RpcEncoder.PROTOBUF_BODY_TAG);
+    final int pBodyLength = readRawVarint32(is);
+    final ByteBuf pBody = buffer.slice(buffer.readerIndex(), pBodyLength);
+    buffer.skipBytes(pBodyLength);
+    buffer.retain();
+    if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody);
+
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug("post protobufbody read index {}", buffer.readerIndex());
+    
+    ByteBuf dBody = null;
+    int dBodyLength = 0;
+
+    // read the data body.
+    if (buffer.readableBytes() > 0) {
+      
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available());
+      checkTag(is, RpcEncoder.RAW_BODY_TAG);
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reading length.");
+      dBodyLength = readRawVarint32(is);
+      if(buffer.readableBytes() != dBodyLength) throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes()));
+      dBody = buffer.slice();
+      buffer.retain();
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Read raw body of {}", dBody);
+      
+    }else{
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("No need to read raw body, no readable bytes left.");
+    }
+
+
+    // return the rpc message.
+    InboundRpcMessage m = new InboundRpcMessage(header.getMode(), header.getRpcType(), header.getCoordinationId(),
+        pBody, dBody);
+
+    // move the reader index forward so the next rpc call won't try to work with it.
+    buffer.skipBytes(dBodyLength);
+    messageCounter.incrementAndGet();
+    if (RpcConstants.EXTRA_DEBUGGING) logger.trace("Inbound Rpc Message Decoded {}.", m);
+    return m;
+
+  }
+
+  private void checkTag(ByteBufInputStream is, int expectedTag) throws IOException {
+    int actualTag = readRawVarint32(is);
+    if (actualTag != expectedTag){
+      throw new CorruptedFrameException(String.format("Expected to read a tag of %d but actually received a value of %d.  Happened after reading %d message.", expectedTag, actualTag, messageCounter.get()));
+    }
+  }
+
+  // Taken from CodedInputStream and modified to enable ByteBufInterface.
+  public static int readRawVarint32(ByteBufInputStream is) throws IOException {
+    byte tmp = is.readByte();
+    if (tmp >= 0) {
+      return tmp;
+    }
+    int result = tmp & 0x7f;
+    if ((tmp = is.readByte()) >= 0) {
+      result |= tmp << 7;
+    } else {
+      result |= (tmp & 0x7f) << 7;
+      if ((tmp = is.readByte()) >= 0) {
+        result |= tmp << 14;
+      } else {
+        result |= (tmp & 0x7f) << 14;
+        if ((tmp = is.readByte()) >= 0) {
+          result |= tmp << 21;
+        } else {
+          result |= (tmp & 0x7f) << 21;
+          result |= (tmp = is.readByte()) << 28;
+          if (tmp < 0) {
+            // Discard upper 32 bits.
+            for (int i = 0; i < 5; i++) {
+              if (is.readByte() >= 0) {
+                return result;
+              }
+            }
+            throw new CorruptedFrameException("Encountered a malformed varint.");
+          }
+        }
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
new file mode 100644
index 0000000..8d3d97c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
+
+import java.io.OutputStream;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
+
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+
+/**
+ * Converts an RPCMessage into wire format.
+ */
+class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class);
+  
+  static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+  static final int PROTOBUF_BODY_TAG = makeTag(CompleteRpcMessage.PROTOBUF_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+  static final int RAW_BODY_TAG = makeTag(CompleteRpcMessage.RAW_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+  static final int HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG);
+  static final int PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
+  static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
+  
+  
+  @Override
+  public void flush(ChannelHandlerContext ctx, OutboundRpcMessage msg) throws Exception {
+    if(!ctx.channel().isOpen()){
+      return;
+    }
+    
+    try{
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Encoding outbound message {}", msg);
+      // first we build the RpcHeader 
+      RpcHeader header = RpcHeader.newBuilder() //
+          .setMode(msg.mode) //
+          .setCoordinationId(msg.coordinationId) //
+          .setRpcType(msg.rpcType).build();
+      
+      // figure out the full length
+      int headerLength = header.getSerializedSize();
+      int protoBodyLength = msg.pBody.getSerializedSize();
+      int rawBodyLength = msg.dBody == null ? 0 : msg.dBody.readableBytes();
+      int fullLength = //
+          HEADER_TAG_LENGTH + getRawVarintSize(headerLength) + headerLength +   //
+          PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(protoBodyLength) + protoBodyLength; //
+      
+      if(rawBodyLength > 0){
+        fullLength += (RAW_BODY_TAG_LENGTH + getRawVarintSize(rawBodyLength) + rawBodyLength);
+      }
+
+      // set up buffers.
+      ByteBuf buf = ctx.nextOutboundByteBuffer();
+      OutputStream os = new ByteBufOutputStream(buf);
+      CodedOutputStream cos = CodedOutputStream.newInstance(os);
+
+      // write full length first (this is length delimited stream).
+      cos.writeRawVarint32(fullLength);
+      
+      // write header
+      cos.writeRawVarint32(HEADER_TAG);
+      cos.writeRawVarint32(headerLength);
+      header.writeTo(cos);
+
+      // write protobuf body length and body
+      cos.writeRawVarint32(PROTOBUF_BODY_TAG);
+      cos.writeRawVarint32(protoBodyLength);
+      msg.pBody.writeTo(cos);
+
+      // if exists, write data body and tag.
+      if(msg.dBody != null && msg.dBody.readableBytes() > 0){
+        cos.writeRawVarint32(RAW_BODY_TAG);
+        cos.writeRawVarint32(rawBodyLength);
+        cos.flush(); // need to flush so that dbody goes after if cos is caching.
+        buf.writeBytes(msg.dBody);
+      }else{
+        cos.flush();
+      }
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Wrote message with length header of {} bytes and body of {} bytes.", getRawVarintSize(fullLength), fullLength);
+      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Sent message.  Ending writer index was {}.", buf.writerIndex());
+    
+    }finally{
+      // make sure to release Rpc Messages unerlying byte buffers.
+      msg.release();
+    }
+  }
+  
+  /** Makes a tag value given a field number and wire type, copied from WireFormat since it isn't public.  */
+  static int makeTag(final int fieldNumber, final int wireType) {
+    return (fieldNumber << 3) | wireType;
+  }
+  
+  public static int getRawVarintSize(int value) {
+    int count = 0;
+    while (true) {
+      if ((value & ~0x7F) == 0) {
+        count++;
+        return count;
+      } else {
+        count++;
+        value >>>= 7;
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
new file mode 100644
index 0000000..ca66481
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.common.exceptions.DrillIOException;
+
+/**
+ * Parent class for all rpc exceptions.
+ */
+public class RpcException extends DrillIOException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcException.class);
+
+  public RpcException() {
+    super();
+  }
+
+  public RpcException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public RpcException(String message) {
+    super(message);
+  }
+
+  public RpcException(Throwable cause) {
+    super(cause);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
new file mode 100644
index 0000000..ef1b88f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+
+public class RpcExceptionHandler implements ChannelHandler{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);
+  
+  public RpcExceptionHandler(){
+  }
+  
+  @Override
+  public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
+  }
+
+  @Override
+  public void afterAdd(ChannelHandlerContext ctx) throws Exception {
+  }
+
+  @Override
+  public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
+  }
+
+  @Override
+  public void afterRemove(ChannelHandlerContext ctx) throws Exception {
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+    if(!ctx.channel().isOpen()) return;
+    logger.info("Exception in pipeline.  Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause);
+    ctx.close();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
new file mode 100644
index 0000000..fd1938d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcMessage.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
+
+public abstract class RpcMessage {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcMessage.class);
+  
+  public RpcMode mode;
+  public int rpcType;
+  public int coordinationId;
+  public ByteBuf dBody;
+  
+  public RpcMessage(RpcMode mode, int rpcType, int coordinationId, ByteBuf dBody) {
+    this.mode = mode;
+    this.rpcType = rpcType;
+    this.coordinationId = coordinationId;
+    this.dBody = dBody;
+  }
+  
+  public abstract int getBodySize();
+
+  void release(){
+    if(dBody != null) dBody.release();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
new file mode 100644
index 0000000..462bc52
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.CorruptedFrameException;
+
+import com.google.protobuf.CodedInputStream;
+
+/**
+ * Modified version of ProtobufVarint32FrameDecoder that avoids bytebuf copy.
+ */
+public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZeroCopyProtobufLengthDecoder.class);
+
+  @Override
+  protected ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+    if(!ctx.channel().isOpen()){
+      logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+      in.skipBytes(in.readableBytes());
+      return null;
+    }
+    
+    in.markReaderIndex();
+    final byte[] buf = new byte[5];
+    for (int i = 0; i < buf.length; i ++) {
+        if (!in.isReadable()) {
+            in.resetReaderIndex();
+            return null;
+        }
+
+        buf[i] = in.readByte();
+        if (buf[i] >= 0) {
+          
+            int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
+            
+            if (length < 0) {
+                throw new CorruptedFrameException("negative length: " + length);
+            }
+            if (length == 0){
+                throw new CorruptedFrameException("Received a message of length 0.");
+            }
+
+            if (in.readableBytes() < length) {
+                in.resetReaderIndex();
+                return null;
+            } else {
+                ByteBuf out = in.slice(in.readerIndex(), length);
+                in.retain();
+                in.skipBytes(length);
+                if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.", in.readerIndex(), i+1, length));
+                return out;
+            }
+        }
+    }
+
+    // Couldn't find the byte whose MSB is off.
+    throw new CorruptedFrameException("length wider than 32-bit");
+    
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
new file mode 100644
index 0000000..b16c6cb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -0,0 +1,62 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.protobuf.MessageLite;
+
+public class BitClient  extends BasicClient<RpcType>{
+  
+  private final DrillbitContext context;
+  private final BitComHandler handler;
+  
+  public BitClient(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
+    super(alloc, eventLoopGroup);
+    this.context = context;
+    this.handler = handler;
+  }
+  
+  @Override
+  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    return handler.getResponseDefaultInstance(rpcType);
+  }
+
+  @Override
+  protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    return handler.handle(context, rpcType, pBody, dBody);
+  }
+
+  @Override
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
+    return super.getCloseHandler(ch);
+  }
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
new file mode 100644
index 0000000..5c1bf21
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcBus;
+
+/**
+ * Service that allows one Drillbit to communicate with another. Internally manages whether each particular bit is a server
+ * or a client depending on who initially made the connection. If no connection exists, BitCom is
+ * responsible for making a connection.  BitCom should automatically straight route local BitCommunication rather than connecting to its self.
+ */
+public interface BitCom extends Closeable{
+
+  /**
+   * Send a record batch to another node.  
+   * @param node The node id to send the record batch to.
+   * @param batch The record batch to send.
+   * @return A Future<Ack> object that can be used to determine the outcome of sending.
+   */
+  public abstract DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch);
+
+  /**
+   * Send a query PlanFragment to another bit.   
+   * @param context
+   * @param node
+   * @param fragment
+   * @return
+   */
+  public abstract DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node, PlanFragment fragment);
+  
+  public abstract DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle);
+  
+  public abstract DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle);
+  
+  
+  public interface TunnelListener extends GenericFutureListener<ChannelFuture> {
+    public void connectionEstablished(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
new file mode 100644
index 0000000..94e3eff
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComHandler.java
@@ -0,0 +1,136 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.socket.SocketChannel;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.BitBatchChunk;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitCom.TunnelListener;
+import org.apache.drill.exec.rpc.bit.BitComImpl.TunnelModifier;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.protobuf.MessageLite;
+
+public class BitComHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComHandler.class);
+  
+  private final TunnelModifier modifier;
+  
+  public BitComHandler(TunnelModifier modifier){
+    this.modifier = modifier;
+  }
+  
+  public TunnelListener getTunnelListener(RpcBus<?>.ChannelClosedHandler internalHandler){
+    return new Listener(internalHandler);
+  }
+  
+  public class Listener implements TunnelListener {
+    final RpcBus<?>.ChannelClosedHandler internalHandler;
+
+    public Listener(RpcBus<?>.ChannelClosedHandler internalHandler) {
+      this.internalHandler = internalHandler;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      logger.debug("BitTunnel closed, removing from BitCom.");
+      internalHandler.operationComplete(future);
+      BitTunnel t = modifier.remove(future.channel());
+      if(t != null) t.shutdownIfClient();
+    }
+
+    @Override
+    public void connectionEstablished(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
+      modifier.create(channel, endpoint, bus);
+    }
+
+  }
+
+  
+
+
+  public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    switch (rpcType) {
+    case RpcType.ACK_VALUE:
+      return Ack.getDefaultInstance();
+    case RpcType.HANDSHAKE_VALUE:
+      return BitHandshake.getDefaultInstance();
+    case RpcType.RESP_FRAGMENT_HANDLE_VALUE:
+      return FragmentHandle.getDefaultInstance();
+    case RpcType.RESP_FRAGMENT_STATUS_VALUE:
+      return FragmentStatus.getDefaultInstance();
+    case RpcType.RESP_BIT_STATUS_VALUE:
+      return BitStatus.getDefaultInstance();
+    case RpcType.RESP_BATCH_CHUNK_VALUE:
+      return BitBatchChunk.getDefaultInstance();
+      
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  protected Response handle(DrillbitContext context, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    switch (rpcType) {
+    
+    case RpcType.HANDSHAKE_VALUE:
+      // parse incoming handshake.
+      // get endpoint information.
+      // record endpoint information in registry.
+      // respond with our handshake info.
+      return new Response(RpcType.HANDSHAKE, BitHandshake.getDefaultInstance(), null);
+      
+    case RpcType.REQ_BATCH_CHUNK_VALUE:
+      return new Response(RpcType.RESP_BATCH_CHUNK, BitBatchChunk.getDefaultInstance(), null);
+      
+    case RpcType.REQ_BIT_STATUS_VALUE:
+      return new Response(RpcType.RESP_BIT_STATUS, BitStatus.getDefaultInstance(), null);
+      
+    case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
+      return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
+
+    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+      return new Response(RpcType.RESP_FRAGMENT_STATUS, FragmentStatus.getDefaultInstance(), null);
+      
+    case RpcType.REQ_INIATILIZE_FRAGMENT_VALUE:
+      return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
+      
+    case RpcType.REQ_RECORD_BATCH_VALUE:
+      return new Response(RpcType.RESP_BATCH_CHUNK, BitBatchChunk.getDefaultInstance(), null);
+      
+    default:
+      throw new UnsupportedOperationException();
+    }
+
+  }
+  
+
+  
+  
+}