You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:42 UTC

[21/53] [abbrv] Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index 4ba99a1..82a6aa6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -22,57 +22,54 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.rpc.BasicClient;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 
 import com.google.protobuf.MessageLite;
 
-public class BitClient  extends BasicClient<RpcType, BitConnection>{
+public class BitClient  extends BasicClient<RpcType, BitConnection, BitHandshake, BitHandshake>{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
 
   private final BitComHandler handler;
-  private final DrillbitEndpoint endpoint;
-  private BitConnection connection;
-  private final AvailabilityListener openListener;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+  private final DrillbitEndpoint remoteEndpoint;
+  private volatile BitConnection connection;
   private final ListenerPool listeners;
+  private final CloseHandlerCreator closeHandlerFactory;
+  private final DrillbitEndpoint localIdentity;
   
-  public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
-    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
-
-    this.endpoint = endpoint;
+  public BitClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, BitComHandler handler, BootStrapContext context, CloseHandlerCreator closeHandlerFactory, ListenerPool listeners) {
+    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER);
+    this.localIdentity = localEndpoint;
+    this.remoteEndpoint = remoteEndpoint;
     this.handler = handler;
-    this.openListener = openListener;
-    this.registry = registry;
     this.listeners = listeners;
+    this.closeHandlerFactory = closeHandlerFactory;
   }
   
-  public BitHandshake connect() throws RpcException, InterruptedException{
-    BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
-    connection.setEndpoint(endpoint);
-    return bs;
+  public void connect(RpcConnectionHandler<BitConnection> connectionHandler) {
+    connectAsClient(connectionHandler, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getBitPort());
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public BitConnection initRemoteConnection(Channel channel) {
-    this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+    this.connection = new BitConnection(channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, listeners);
     return connection;
   }
 
   @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
-    return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
   }
 
   @Override
@@ -86,18 +83,15 @@ public class BitClient  extends BasicClient<RpcType, BitConnection>{
   }
 
   @Override
-  protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
-    return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
-
-      @Override
-      protected void validateHandshake(BitHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from bit server to bit client. {}", inbound);
-        if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
-      }
+  protected void validateHandshake(BitHandshake handshake) throws RpcException {
+    if(handshake.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", handshake.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+  }
 
-    };
+  @Override
+  protected void finalizeConnection(BitHandshake handshake, BitConnection connection) {
+    connection.setEndpoint(handshake.getEndpoint());
   }
-  
+
   public BitConnection getConnection(){
     return this.connection;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index c60d36b..f7f508e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -40,11 +40,17 @@ public interface BitCom extends Closeable {
    */
   public BitTunnel getTunnel(DrillbitEndpoint node) ;
 
-  public int start() throws InterruptedException, DrillbitStartupException;
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException;
 
   /**
    * Register an incoming batch handler for a local foreman.  
    * @param handler
    */
   public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
+  
+  /**
+   * Get ListenerPool
+   * @return
+   */
+  public ListenerPool getListeners();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index c98be44..d1cadc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -18,157 +18,68 @@
 package org.apache.drill.exec.rpc.bit;
 
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * Manages communication tunnels between nodes.   
+ * Manages communication tunnels between nodes.
  */
 public class BitComImpl implements BitCom {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
 
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
   private final ListenerPool listeners;
   private volatile BitServer server;
   private final BitComHandler handler;
   private final BootStrapContext context;
-  
-  // TODO: this executor should be removed.
-  private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
+  private final ConnectionManagerRegistry connectionRegistry;
 
   public BitComImpl(BootStrapContext context, BitComHandler handler) {
     super();
     this.handler = handler;
     this.context = context;
     this.listeners = new ListenerPool(8);
+    this.connectionRegistry = new ConnectionManagerRegistry(handler, context, listeners);
   }
 
-  public int start() throws InterruptedException, DrillbitStartupException {
-    server = new BitServer(handler, context, registry, listeners);
+  @Override
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+    server = new BitServer(handler, context, connectionRegistry, listeners);
     int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
-    return server.bind(port);
-  }
-
-  private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
-    
-    
-    SettableFuture<BitConnection> future = SettableFuture.create();
-    BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
-    BitConnection t = null;
-
-    if (check) {
-      t = registry.get(endpoint);
-
-      if (t != null) {
-        future.set(t);
-        return checkedFuture;
-      }
-    }
-    
-    try {
-      AvailWatcher watcher = new AvailWatcher(future);
-      BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
-      c.connect();
-      return checkedFuture;
-    } catch (InterruptedException | RpcException e) {
-      future.setException(new FragmentSetupException("Unable to open connection"));
-      return checkedFuture;
-    }
-
+    port = server.bind(port);
+    DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setBitPort(port).build();
+    connectionRegistry.setEndpoint(completeEndpoint);
+    return completeEndpoint;
   }
 
-  private class AvailWatcher implements AvailabilityListener{
-    final SettableFuture<BitConnection> future;
-    
-    public AvailWatcher(SettableFuture<BitConnection> future) {
-      super();
-      this.future = future;
-    }
-
-    @Override
-    public void isAvailable(BitConnection connection) {
-      future.set(connection);
-    }
-    
-  }
   
-  BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
-    BitConnection t = registry.get(endpoint);
-    if(t != null) return t;
-    return this.getNode(endpoint, false).checkedGet();
+   
+  public ListenerPool getListeners() {
+    return listeners;
   }
 
-  
-  CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
-    return this.getNode(endpoint, true);
-  }
-
-  
   @Override
-  public BitTunnel getTunnel(DrillbitEndpoint endpoint){
-    BitConnection t = registry.get(endpoint);
-    if(t == null){
-      return new BitTunnel(exec, endpoint, this, t);
-    }else{
-      return new BitTunnel(exec, endpoint, this,  this.getNode(endpoint, false));
-    }
+  public BitTunnel getTunnel(DrillbitEndpoint endpoint) {
+    return new BitTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
   }
 
-
-  /**
-   * A future which remaps exceptions to a BitComException.
-   * @param <T>
-   */
-  private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
-
-    protected BitComFuture(ListenableFuture<T> delegate) {
-      super(delegate);
-    }
-
-    @Override
-    protected RpcException mapException(Exception e) {
-      Throwable t = e;
-      if(e instanceof ExecutionException){
-        t = e.getCause();
-      }
-      
-      if(t instanceof RpcException) return (RpcException) t;
-      return new RpcException(t);
-    }
+  @Override
+  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+    this.handler.registerIncomingFragmentHandler(handler);
   }
 
   public void close() {
     Closeables.closeQuietly(server);
-    for (BitConnection bt : registry.values()) {
-      bt.shutdownIfClient();
+    for (BitConnectionManager bt : connectionRegistry) {
+      bt.close();
     }
   }
 
-  @Override
-  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
-    this.handler.registerIncomingFragmentHandler(handler);
-  }
-  
-  
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
new file mode 100644
index 0000000..692c63e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+
+import com.google.protobuf.MessageLite;
+
+public interface BitCommand<T extends MessageLite> extends RpcConnectionHandler<BitConnection>{
+
+  public abstract void connectionAvailable(BitConnection connection);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
index 73980f9..f85ea74 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -35,31 +36,35 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
 
 public class BitConnection extends RemoteConnection{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class); 
   
   private final RpcBus<RpcType, BitConnection> bus;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
   private final ListenerPool listeners;
-
-  private final AvailabilityListener listener;
   private volatile DrillbitEndpoint endpoint;
   private volatile boolean active = false;
   private final UUID id;
   
-  public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+  public BitConnection(Channel channel, RpcBus<RpcType, BitConnection> bus, ListenerPool listeners){
     super(channel);
     this.bus = bus;
-    this.registry = registry;
     // we use a local listener pool unless a global one is provided.
     this.listeners = listeners != null ? listeners : new ListenerPool(2);
-    this.listener = listener;
     this.id = UUID.randomUUID();
   }
+  
+  void setEndpoint(DrillbitEndpoint endpoint){
+    assert this.endpoint == null : "Endpoint should only be set once (only in the case in incoming server requests).";
+    this.endpoint = endpoint;
+    active = true;
+  }
 
   protected DrillbitEndpoint getEndpoint() {
     return endpoint;
@@ -69,48 +74,12 @@ public class BitConnection extends RemoteConnection{
     return listeners;
   }
   
-  protected void setEndpoint(DrillbitEndpoint endpoint) {
-    Preconditions.checkNotNull(endpoint);
-    Preconditions.checkArgument(this.endpoint == null);
-    
-    this.endpoint = endpoint;
-    BitServer.logger.debug("Adding new endpoint to available BitServer connections.  Endpoint: {}.", endpoint);
-    synchronized(this){
-      BitConnection c = registry.putIfAbsent(endpoint, this);
-      
-      if(c != null){ // the registry already has a connection like this
-        
-        // give the awaiting future an alternative connection.
-        if(listener != null){
-          listener.isAvailable(c);
-        }
-        
-        // shut this down if this is a client as it won't be available in the registry.
-        // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other.  This shouldn't cause a problem.
-        logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
-        shutdownIfClient();
-        
-      }
-      active = true;
-      if(listener != null) listener.isAvailable(this);
-    }
-  }
-
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
-    return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
-  }
-  
-  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
-    return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
-  }
   
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
-    return bus.send(this,  RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){
+    bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
   }
   
-  public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
-    return bus.send(this,  RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
-  }
 
   public void disable(){
     active = false;
@@ -140,27 +109,7 @@ public class BitConnection extends RemoteConnection{
     return true;
   }
 
-  public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
-    return new CloseHandler(this, parent);
-  }
-  
-  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
-    private BitConnection connection;
-    private GenericFutureListener<ChannelFuture> parent;
-    
-    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
-      super();
-      this.connection = connection;
-      this.parent = parent;
-    }
 
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
-      parent.operationComplete(future);
-    }
-    
-  }
   
   public void shutdownIfClient(){
     if(bus.isClient()) Closeables.closeQuietly(bus);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
index 0160d24..d99bb22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -17,58 +17,152 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
 
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.protobuf.MessageLite;
 
-public class BitConnectionManager {
+/**
+ * Manager all connections between two particular bits.
+ */
+public class BitConnectionManager implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
   
-  private final int maxAttempts;
-  private final BitComImpl com;
   private final DrillbitEndpoint endpoint;
-  private final AtomicReference<BitConnection> connection;
-  private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+  private final AtomicReference<BitConnection> connectionHolder;
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  private final ListenerPool listenerPool;
+  private final DrillbitEndpoint localIdentity;
+  
+  public BitConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity, BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+    assert remoteEndpoint != null : "Endpoint cannot be null.";
+    assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
+    assert remoteEndpoint.getBitPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k.  Was set to %d.", remoteEndpoint.getBitPort());
+    
+    this.connectionHolder =  new AtomicReference<BitConnection>();
+    this.endpoint = remoteEndpoint;
+    this.localIdentity = localIdentity;
+    this.handler = handler;
+    this.context = context;
+    this.listenerPool = listenerPool;
+  }
+  
+  public <R extends MessageLite> BitCommand<R> runCommand(BitCommand<R> cmd){
+    logger.debug("Running command {}", cmd);
+    BitConnection connection = connectionHolder.get();
+    if(connection != null){
+      if(connection.isActive()){
+        cmd.connectionAvailable(connection);
+        return cmd;
+      }else{
+        // remove the old connection. (don't worry if we fail since someone else should have done it.
+        connectionHolder.compareAndSet(connection, null);
+      }
+    }
+    
+    /** We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another endpoint could create a reverse connection **/
+    synchronized(this){
+      connection = connectionHolder.get();
+      if(connection != null){
+        cmd.connectionAvailable(connection);
+      }else{
+        BitClient client = new BitClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator(), listenerPool);
+        
+        client.connect(new ConnectionListeningDecorator(cmd, !endpoint.equals(localIdentity)));
+      }
+      return cmd;
+      
+    }
+  }
+  
+  CloseHandlerCreator getCloseHandlerCreator(){
+    return new CloseHandlerCreator();
+  }
 
-  BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
-    assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
-    this.com = com;
-    this.connection =  new AtomicReference<BitConnection>(connection);
-    this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
-    this.endpoint = endpoint;
-    this.maxAttempts = maxAttempts;
+  /** Factory for close handlers **/
+  class CloseHandlerCreator{
+    public GenericFutureListener<ChannelFuture> getHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent){
+      return new CloseHandler(connection, parent);
+    }
   }
   
-  BitConnection getConnection(int attempt) throws RpcException{
-    BitConnection con = connection.get();
+  
+  
+  /**
+   * Listens for connection closes and clears connection holder.
+   */
+  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+    private BitConnection connection;
+    private GenericFutureListener<ChannelFuture> parent;
     
-    if(con != null){
-      if(con.isActive()) return con;
-      connection.compareAndSet(con, null);
+    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+      super();
+      this.connection = connection;
+      this.parent = parent;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      connectionHolder.compareAndSet(connection, null);
+      parent.operationComplete(future);
     }
     
-    CheckedFuture<BitConnection, RpcException> fut = future.get();
+  } 
+  
+  /**
+   * Decorate a connection creation so that we capture a success and keep it available for future requests.  If we have raced and another is already available... we return that one and close things down on this one.
+   */
+  private class ConnectionListeningDecorator implements RpcConnectionHandler<BitConnection>{
+
+    private final RpcConnectionHandler<BitConnection> delegate;
+    private final boolean closeOnDupe;
+    
+    public ConnectionListeningDecorator(RpcConnectionHandler<BitConnection> delegate,  boolean closeOnDupe) {
+      this.delegate = delegate;
+      this.closeOnDupe = closeOnDupe;
+    }
 
-    if(fut != null){
-      try{
-        return fut.checkedGet();
-      }catch(RpcException ex){
-        future.compareAndSet(fut, null);
-        if(attempt < maxAttempts){
-          return getConnection(attempt + 1);
-        }else{
-          throw ex;
+    @Override
+    public void connectionSucceeded(BitConnection incoming) {
+      BitConnection connection = connectionHolder.get();
+      while(true){
+        boolean setted = connectionHolder.compareAndSet(null, incoming);
+        if(setted){
+          connection = incoming;
+          break;
         }
+        connection = connectionHolder.get();
+        if(connection != null) break; 
+      }
+      
+      
+      if(connection == incoming){
+        delegate.connectionSucceeded(connection);
+      }else{
+
+        if(closeOnDupe){
+          // close the incoming because another channel was created in the mean time (unless this is a self connection).
+          logger.debug("Closing incoming connection because a connection was already set.");
+          incoming.getChannel().close();
+        }
+        delegate.connectionSucceeded(connection);
       }
     }
-    
-    // no checked future, let's make one.
-    fut = com.getConnectionAsync(endpoint);
-    future.compareAndSet(null, fut);
-    return getConnection(attempt);
+
+    @Override
+    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+      delegate.connectionFailed(type, t);
+    }
     
   }
 
@@ -76,5 +170,20 @@ public class BitConnectionManager {
     return endpoint;
   }
   
+  public void addServerConnection(BitConnection connection){
+    // if the connection holder is not set, set it to this incoming connection.
+    logger.debug("Setting server connection.");
+    this.connectionHolder.compareAndSet(null, connection);
+  }
+
+  @Override
+  public void close() {
+    BitConnection c = connectionHolder.getAndSet(null);
+    if(c != null){
+      c.getChannel().close();
+    }
+  }
+  
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index 88ac6cc..d4665a8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -22,18 +22,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 
@@ -43,13 +38,14 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
   
   private final BitComHandler handler;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
   private final ListenerPool listeners;
+  private final ConnectionManagerRegistry connectionRegistry;
+  private volatile ProxyCloseHandler proxyCloseHandler;
   
-  public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+  public BitServer(BitComHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry, ListenerPool listeners) {
     super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
     this.handler = handler;
-    this.registry = registry;
+    this.connectionRegistry = connectionRegistry;
     this.listeners = listeners;
   }
   
@@ -65,23 +61,36 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
 
   @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
-    return connection.getCloseHandler(super.getCloseHandler(connection));
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+    return proxyCloseHandler;
   }
 
   @Override
   public BitConnection initRemoteConnection(Channel channel) {
-    return new BitConnection(null, channel, this, registry, listeners);
+    return new BitConnection(channel, this, listeners);
   }
   
   
   @Override
-  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler(final BitConnection connection) {
     return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
       
       @Override
       public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from other bit. {}", inbound);
+//        logger.debug("Handling handshake from other bit. {}", inbound);
         if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+        if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getBitPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information.  Received %s.", inbound.getEndpoint()));
+        connection.setEndpoint(inbound.getEndpoint());
+
+        // add the 
+        BitConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
+        
+        // update the close handler.
+        proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+        
+        // add to the connection manager. 
+        manager.addServerConnection(connection);
+
         return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
       }
 
@@ -89,5 +98,30 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
   }
 
 
+  private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
+
+    private volatile GenericFutureListener<ChannelFuture>  handler;
+    
+    public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) {
+      super();
+      this.handler = handler;
+    }
+
+
+    public GenericFutureListener<ChannelFuture> getHandler() {
+      return handler;
+    }
+
+
+    public void setHandler(GenericFutureListener<ChannelFuture> handler) {
+      this.handler = handler;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      handler.operationComplete(future);
+    }
+    
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 652fa52..83b7959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,95 +17,79 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 
-/**
- * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
- * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
- * and action. A better approach should be done.
- */
 public class BitTunnel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
 
-  private static final int MAX_ATTEMPTS = 3;
-
   private final BitConnectionManager manager;
-  private final Executor exec;
-  
+  private final DrillbitEndpoint endpoint;
 
-  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
-    this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
-    this.exec = exec;
-  }
-
-  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
-      CheckedFuture<BitConnection, RpcException> future) {
-    this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
-    this.exec = exec;
+  public BitTunnel(DrillbitEndpoint endpoint, BitConnectionManager manager) {
+    this.manager = manager;
+    this.endpoint = endpoint;
   }
   
   public DrillbitEndpoint getEndpoint(){
     return manager.getEndpoint();
   }
 
-  private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
-    exec.execute(command);
-    return command;
-  }
-
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
-    return submit(new SendBatch(batch, context));
+  public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentContext context, FragmentWritableBatch batch) {
+    SendBatch b = new SendBatch(outcomeListener, batch, context);
+    manager.runCommand(b);
   }
 
-  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
-    return submit(new SendFragment(fragment));
+  public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){
+    SendFragment b = new SendFragment(outcomeListener, fragment);
+    manager.runCommand(b);
   }
-
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
-    return submit(new CancelFragment(handle));
+  
+  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+    CancelFragment b = new CancelFragment(handle);
+    manager.runCommand(b);
+    return b.getFuture();
   }
-
+  
   public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
-    return submit(new SendFragmentStatus(status));
+    SendFragmentStatus b = new SendFragmentStatus(status);
+    manager.runCommand(b);
+    return b.getFuture();
   }
 
-  public class SendBatch extends BitCommand<Ack> {
+  public static class SendBatch extends ListeningBitCommand<Ack> {
     final FragmentWritableBatch batch;
     final FragmentContext context;
 
-    public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
-      super();
+    public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch, FragmentContext context) {
+      super(listener);
       this.batch = batch;
       this.context = context;
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      logger.debug("Sending record batch. {}", batch);
-      return connection.sendRecordBatch(context, batch);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
     }
 
+    @Override
+    public String toString() {
+      return "SendBatch [batch.header=" + batch.getHeader() + "]";
+    }
+    
+    
   }
 
-  public class SendFragmentStatus extends BitCommand<Ack> {
+  public static class SendFragmentStatus extends FutureBitCommand<Ack> {
     final FragmentStatus status;
 
     public SendFragmentStatus(FragmentStatus status) {
@@ -114,12 +98,13 @@ public class BitTunnel {
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.sendFragmentStatus(status);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
     }
+
   }
 
-  public class CancelFragment extends BitCommand<Ack> {
+  public static class CancelFragment extends FutureBitCommand<Ack> {
     final FragmentHandle handle;
 
     public CancelFragment(FragmentHandle handle) {
@@ -128,109 +113,23 @@ public class BitTunnel {
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.cancelFragment(handle);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle,  Ack.class);
     }
 
   }
 
-  public class SendFragment extends BitCommand<Ack> {
+  public static class SendFragment extends ListeningBitCommand<Ack> {
     final PlanFragment fragment;
 
-    public SendFragment(PlanFragment fragment) {
-      super();
+    public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment fragment) {
+      super(listener);
       this.fragment = fragment;
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.sendFragment(fragment);
-    }
-
-  }
-
-
-  
-
-  private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
-
-    public void addLightListener(RpcOutcomeListener<T> outcomeListener){
-      this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
-    }
-
-    public BitCommand() {
-      super(SettableFuture.<T> create());
-    }
-
-    public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
-
-    public final void run() {
-      
-      try {
-        
-        BitConnection connection = manager.getConnection(0);
-        assert connection != null : "The connection manager should never return a null connection.  Worse case, it should throw an exception.";
-        CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
-        rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
-      } catch (RpcException ex) {
-        ((SettableFuture<T>) delegate()).setException(ex);
-      }
-
-    }
-
-    @Override
-    protected RpcException mapException(Exception e) {
-      Throwable t = e;
-      if (e instanceof ExecutionException) {
-        t = e.getCause();
-      }
-      if (t instanceof RpcException) return (RpcException) t;
-      return new RpcException(t);
-    }
-
-    public class RpcOutcomeListenerWrapper implements Runnable{
-      final RpcOutcomeListener<T> inner;
-      
-      public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
-        this.inner = inner;
-      }
-
-      @Override
-      public void run() {
-        try{
-          inner.success(BitCommand.this.checkedGet());
-        }catch(RpcException e){
-          inner.failed(e);
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "BitCommand ["+this.getClass().getSimpleName()+"]";
-    }
-    
-    
-    
-  }
-
-  private class FutureBridge<T> implements Runnable {
-    final SettableFuture<T> out;
-    final CheckedFuture<T, RpcException> in;
-
-    public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
-      super();
-      this.out = out;
-      this.in = in;
-    }
-
-    @Override
-    public void run() {
-      try {
-        out.set(in.checkedGet());
-      } catch (RpcException ex) {
-        out.setException(ex);
-      }
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
new file mode 100644
index 0000000..8afbc33
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class ConnectionManagerRegistry implements Iterable<BitConnectionManager>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
+  
+  private final ConcurrentMap<DrillbitEndpoint, BitConnectionManager> registry = Maps.newConcurrentMap();
+  
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  private final ListenerPool listenerPool;
+  private volatile DrillbitEndpoint localEndpoint;
+  
+  public ConnectionManagerRegistry(BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+    super();
+    this.handler = handler;
+    this.context = context;
+    this.listenerPool = listenerPool;
+  }
+
+  public BitConnectionManager getConnectionManager(DrillbitEndpoint endpoint){
+    assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
+    BitConnectionManager m = registry.get(endpoint);
+    if(m == null){
+      m = new BitConnectionManager(endpoint, localEndpoint, handler, context, listenerPool);
+      BitConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+      if(m2 != null) m = m2;
+    }
+    
+    return m;
+  }
+
+  @Override
+  public Iterator<BitConnectionManager> iterator() {
+    return registry.values().iterator();
+  }
+  
+  public void setEndpoint(DrillbitEndpoint endpoint){
+    this.localEndpoint = endpoint;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
new file mode 100644
index 0000000..fa3b518
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class FutureBitCommand<T extends MessageLite> implements BitCommand<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
+
+  protected final SettableFuture<T> settableFuture;
+  private final RpcCheckedFuture<T> parentFuture;
+
+  public FutureBitCommand() {
+    this.settableFuture = SettableFuture.create();
+    this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+  @Override
+  public void connectionAvailable(BitConnection connection) {
+    
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(BitConnection connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      settableFuture.setException(ex);
+    }
+
+    @Override
+    public void success(T value) {
+      settableFuture.set(value);
+    }
+
+  }
+
+  public DrillRpcFuture<T> getFuture() {
+    return parentFuture;
+  }
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    settableFuture.setException(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
index 8f299d2..84dba85 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -22,32 +22,35 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.foreman.FragmentStatusListener;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 
 public class ListenerPool {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
   
-  private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+  private final ConcurrentMap<QueryId, FragmentStatusListener> listeners;
   
   public ListenerPool(int par){
-    listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+    listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(16, 0.75f, par);
   }
   
   public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+    logger.debug("Removing framgent status listener for handle {}.", handle);
     listeners.remove(handle);
   }
   
   public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
-    FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+    logger.debug("Adding framgent status listener for handle {}.", handle);
+    FragmentStatusListener old = listeners.putIfAbsent(handle.getQueryId(), listener);
     if(old != null) throw new RpcException("Failure.  The provided handle already exists in the listener pool.  You need to remove one listener before adding another.");
   }
   
   public void status(FragmentStatus status){
-    FragmentStatusListener l = listeners.get(status.getHandle());
+    FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
     if(l == null){
-      logger.info("A fragment message arrived but there was no registered listener for that message.");
+      
+      logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.", status.getHandle());
       return;
     }else{
       l.statusUpdate(status);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
new file mode 100644
index 0000000..90db6a6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class ListeningBitCommand<T extends MessageLite> implements BitCommand<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningBitCommand.class);
+
+  private final RpcOutcomeListener<T> listener;
+
+  public ListeningBitCommand(RpcOutcomeListener<T> listener) {
+    this.listener = listener;
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+  @Override
+  public void connectionAvailable(BitConnection connection) {
+    
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(BitConnection connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.failed(ex);
+    }
+
+    @Override
+    public void success(T value) {
+      listener.success(value);
+    }
+
+  }
+
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    listener.failed(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 3df88b7..779085c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -45,5 +45,12 @@ public class QueryResultBatch {
   public boolean hasData(){
     return data != null;
   }
+
+  @Override
+  public String toString() {
+    return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+  }
+  
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
new file mode 100644
index 0000000..0aa7c86
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+/**
+ * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is:
+ * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for
+ * query.
+ * 
+ * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo.
+ * 
+ */
+public class QueryResultHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+
+  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
+  
+  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){
+    return new SubmissionListener(listener);
+  }
+  
+  public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
+    final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+    UserResultsListener l = resultsListener.get(result.getQueryId());
+    // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+    if (l != null) {
+      // logger.debug("Results listener available, using existing.");
+      l.resultArrived(batch);
+      if (result.getIsLastChunk()) {
+        resultsListener.remove(result.getQueryId(), l);
+      }
+    } else {
+      logger.debug("Results listener not available, creating a buffering listener.");
+      // manage race condition where we start getting results before we receive the queryid back.
+      BufferingListener bl = new BufferingListener();
+      l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+      if (l != null) {
+        l.resultArrived(batch);
+      } else {
+        bl.resultArrived(batch);
+      }
+    }
+  }
+
+  private class BufferingListener implements UserResultsListener {
+
+    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private volatile UserResultsListener output;
+
+    public boolean transferTo(UserResultsListener l) {
+      synchronized (this) {
+        output = l;
+        boolean last = false;
+        for (QueryResultBatch r : results) {
+          l.resultArrived(r);
+          last = r.getHeader().getIsLastChunk();
+        }
+        return last;
+      }
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      synchronized (this) {
+        if (output == null) {
+          this.results.add(result);
+        } else {
+          output.resultArrived(result);
+        }
+      }
+    }
+
+    @Override
+    public void submissionFailed(RpcException ex) {
+      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+    }
+
+  }
+
+  private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+    private UserResultsListener listener;
+
+    public SubmissionListener(UserResultsListener listener) {
+      super();
+      this.listener = listener;
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.submissionFailed(ex);
+    }
+
+    @Override
+    public void success(QueryId queryId) {
+      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
+      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+      // we need to deal with the situation where we already received results by the time we got the query id back. In
+      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+      // results during the transition
+      if (oldListener != null) {
+        logger.debug("Unable to place user results listener, buffering listener was already in place.");
+        if (oldListener instanceof BufferingListener) {
+          resultsListener.remove(oldListener);
+          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+          // simply remove the buffering listener if we already have the last response.
+          if (all) {
+            resultsListener.remove(oldListener);
+          } else {
+            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+            if (!replaced) throw new IllegalStateException();
+          }
+        } else {
+          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+        }
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5d2e799..ad44ff2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -21,11 +21,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -36,115 +31,27 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import com.google.protobuf.MessageLite;
 
-public class UserClient extends BasicClientWithConnection<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
 
-  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+  private final QueryResultHandler queryResultHandler = new QueryResultHandler();
 
   public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
-  }
-
-  public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
-    this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
-    return resultsListener.getFuture();
+    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
   }
 
-  public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
-    return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
-  }
-  
-  private class BufferingListener extends UserResultsListener {
-
-    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
-    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private volatile UserResultsListener output;
-
-    public boolean transferTo(UserResultsListener l) {
-      lock.writeLock().lock();
-      output = l;
-      boolean last = false;
-      for (QueryResultBatch r : results) {
-        l.resultArrived(r);
-        last = r.getHeader().getIsLastChunk();
-      }
-      if (future.isDone()) {
-        l.set();
-      }
-      return last;
-    }
-
-    @Override
-    public void resultArrived(QueryResultBatch result) {
-      logger.debug("Result arrvied.");
-      lock.readLock().lock();
-      try {
-        if (output == null) {
-          this.results.add(result);
-        } else {
-          output.resultArrived(result);
-        }
-
-      } finally {
-        lock.readLock().unlock();
-      }
-
-    }
-
-    @Override
-    public void submissionFailed(RpcException ex) {
-      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
-    }
-
+  public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
+    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
   }
 
-  private class SubmissionListener extends RpcOutcomeListener<QueryId> {
-    private UserResultsListener listener;
-
-    public SubmissionListener(UserResultsListener listener) {
-      super();
-      this.listener = listener;
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      listener.submissionFailed(ex);
-    }
-
-    @Override
-    public void success(QueryId queryId) {
-      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
-      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
-
-      // we need to deal with the situation where we already received results by the time we got the query id back. In
-      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
-      // results during the transition
-      if (oldListener != null) {
-        logger.debug("Unable to place user results listener, buffering listener was already in place.");
-        if (oldListener instanceof BufferingListener) {
-          resultsListener.remove(oldListener);
-          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
-          // simply remove the buffering listener if we already have the last response.
-          if (all) {
-            resultsListener.remove(oldListener);
-          } else {
-            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
-            if (!replaced) throw new IllegalStateException();
-          }
-        } else {
-          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
-        }
-      }
-
-    }
-
+  public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
+    UserToBitHandshake hs = UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build();
+    this.connectAsClient(handler, hs, endpoint.getAddress(), endpoint.getUserPort());
   }
 
   @Override
@@ -165,29 +72,7 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
   protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
     switch (rpcType) {
     case RpcType.QUERY_RESULT_VALUE:
-      final QueryResult result = get(pBody, QueryResult.PARSER);
-      final QueryResultBatch batch = new QueryResultBatch(result, dBody);
-      UserResultsListener l = resultsListener.get(result.getQueryId());
-//      logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
-      if (l != null) {
-//        logger.debug("Results listener available, using existing.");
-        l.resultArrived(batch);
-        if (result.getIsLastChunk()) {
-          resultsListener.remove(result.getQueryId(), l);
-          l.set();
-        }
-      } else {
-        logger.debug("Results listener not available, creating a buffering listener.");
-        // manage race condition where we start getting results before we receive the queryid back.
-        BufferingListener bl = new BufferingListener();
-        l = resultsListener.putIfAbsent(result.getQueryId(), bl);
-        if (l != null) {
-          l.resultArrived(batch);
-        } else {
-          bl.resultArrived(batch);
-        }
-      }
-
+      queryResultHandler.batchArrived(pBody, dBody);
       return new Response(RpcType.ACK, Ack.getDefaultInstance());
     default:
       throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
@@ -196,18 +81,16 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
   }
 
   @Override
-  protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
-    return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+  protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+    logger.debug("Handling handshake from bit to user. {}", inbound);
+    if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+      throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(),
+          UserRpcConfig.RPC_VERSION));
 
-      @Override
-      protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from bit to user. {}", inbound);
-        if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
-          throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.",
-              inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
-      }
+  }
 
-    };
+  @Override
+  protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3ce14f0..b1dbfe8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,17 +24,8 @@ import org.apache.drill.exec.rpc.RpcException;
 
 import com.google.common.util.concurrent.SettableFuture;
 
-public abstract class UserResultsListener {
-  SettableFuture<Void> future = SettableFuture.create();
+public interface UserResultsListener {
   
-  final void set(){
-    future.set(null);
-  }
-  
-  Future<Void> getFuture(){
-    return future;
-  }
-
   public abstract void submissionFailed(RpcException ex); 
   public abstract void resultArrived(QueryResultBatch result);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 406afc4..908af61 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -26,16 +26,15 @@ import io.netty.channel.EventLoopGroup;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.work.user.UserWorker;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -100,8 +99,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       super(channel);
     }
 
-    public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
-      return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+    public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+      logger.debug("Sending result to client with {}", result);
+      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
     }
 
   }
@@ -112,7 +112,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
   
   @Override
-  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(UserClientConnection connection) {
     return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3c4d9af..ed13748 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{
   public BootStrapContext(DrillConfig config) {
     super();
     this.config = config;
-    this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+    this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
     this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
     this.allocator = BufferAllocator.getAllocator(config);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 0337a68..199768f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.LocalCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.LocalClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 
 public class RemoteServiceSet implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -37,6 +38,7 @@ public class RemoteServiceSet implements Closeable{
     this.coordinator = coordinator;
   }
 
+
   public DistributedCache getCache() {
     return cache;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d6d3b9c..b07f274 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -53,12 +53,12 @@ public class ServiceEngine implements Closeable{
   
   public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
     int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
-    int bitPort = bitCom.start();
-    return DrillbitEndpoint.newBuilder()
+    DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
         .setAddress(InetAddress.getLocalHost().getHostAddress())
-        .setBitPort(bitPort)
         .setUserPort(userPort)
         .build();
+
+    return bitCom.start(partialEndpoint);
   }
 
   public BitCom getBitCom(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
index f6a9786..9a72845 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -18,10 +18,9 @@
 package org.apache.drill.exec.work;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 
-public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+public abstract class EndpointListener<RET, V> extends BaseRpcOutcomeListener<RET>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
 
   protected final DrillbitEndpoint endpoint;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
index 2900d99..554b398 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -65,6 +65,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
   
   @Override
   public void run() {
+    logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
     if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
       internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
       return;
@@ -76,7 +77,12 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
     try{
       while(state.get() == FragmentState.RUNNING_VALUE){
         if(!root.next()){
-          updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+          if(context.isFailed()){
+            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);  
+          }else{
+            updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+          }
+          
         }
       }
       
@@ -90,7 +96,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
     }finally{
       t.stop();
     }
-    
+    logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
   }
   
   private void internalFail(Throwable excep){