You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/22 03:39:05 UTC

[1/3] Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.

Updated Branches:
  refs/heads/execwork e57a8d6d4 -> b8db98ad7


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index d3664a0..1170a1e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -40,7 +41,6 @@ import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.batch.BitComHandlerImpl;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 import org.apache.drill.exec.work.user.UserWorker;
 
 import com.google.common.collect.Maps;
@@ -63,7 +63,7 @@ public class WorkManager implements Closeable{
   private final BitComHandler bitComWorker;
   private final UserWorker userWorker;
   private final WorkerBee bee;
-  private Executor executor = Executors.newFixedThreadPool(4);
+  private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("Working Thread - "));
   private final EventThread eventThread;
   
   public WorkManager(BootStrapContext context){
@@ -148,9 +148,10 @@ public class WorkManager implements Closeable{
   public void run() {
     try {
     while(true){
-      logger.debug("Checking for pending work tasks.");
+      logger.debug("Polling for pending work tasks.");
       Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
       if(r != null){
+        logger.debug("Starting pending task {}", r);
         executor.execute(r);  
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index 5dacb71..ec03392 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -67,18 +67,23 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
   
   public abstract void streamFinished(int minorFragmentId);
   
-  public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+    boolean decremented = false;
     if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
       int rem = remainingRequired.decrementAndGet();
       if (rem == 0) {
         parentAccounter.decrementAndGet();
+        decremented = true;
       }
     }
     if(batch.getHeader().getIsLastBatch()){
       streamFinished(minorFragmentId);
     }
     getBuffer(minorFragmentId).enqueue(throttle, batch);
+    return decremented;
   }
 
+  
+
   protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
index ff091d7..b5a497e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -24,8 +24,7 @@ import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
 
 interface BatchCollector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
-
-  public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+  public boolean batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 9b227da..edda714 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -159,14 +159,11 @@ public class BitComHandlerImpl implements BitComHandler {
     // Create a handler if there isn't already one.
     if(handler == null){
       
-      
-      
       PlanFragment fragment = bee.getContext().getCache().getFragment(handle);
       if(fragment == null){
         logger.error("Received batch where fragment was not in cache.");
         return Acks.FAIL;
       }
-      
 
       IncomingFragmentHandler newHandler = new RemoteFragmentHandler(fragment, bee.getContext(), bee.getContext().getBitCom().getTunnel(fragment.getForeman()));
       
@@ -174,7 +171,7 @@ public class BitComHandlerImpl implements BitComHandler {
       handler = handlers.putIfAbsent(fragment.getHandle(), newHandler);
           
       if(handler == null){
-        // we added a handler, inform foreman that we did so.  This way, the foreman can track status.  We also tell foreman that we don't need inform ourself.
+        // we added a handler, inform the bee that we did so.  This way, the foreman can track status. 
         bee.addFragmentPendingRemote(newHandler);
         handler = newHandler;
       }
@@ -182,10 +179,12 @@ public class BitComHandlerImpl implements BitComHandler {
     
     boolean canRun = handler.handle(connection.getConnectionThrottle(), new RawFragmentBatch(fragmentBatch, body));
     if(canRun){
+      logger.debug("Arriving batch means local batch can run, starting local batch.");
       // if we've reached the canRun threshold, we'll proceed.  This expects handler.handle() to only return a single true.
       bee.startFragmentPendingRemote(handler);
     }
-    if(handler.isDone()){
+    if(fragmentBatch.getIsLastBatch() && !handler.isWaiting()){
+      logger.debug("Removing handler.  Is Last Batch {}.  Is Waiting for more {}", fragmentBatch.getIsLastBatch(), handler.isWaiting());
       handlers.remove(handler.getHandle());
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 20775c5..264c4b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.work.batch;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -28,6 +29,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -42,7 +44,10 @@ public class IncomingBuffers {
 
   public IncomingBuffers(PhysicalOperator root) {
     Map<Integer, BatchCollector> counts = Maps.newHashMap();
-    root.accept(new CountRequiredFragments(), counts);
+    CountRequiredFragments reqFrags = new CountRequiredFragments();
+    root.accept(reqFrags, counts);
+    
+    logger.debug("Came up with a list of {} required fragments.  Fragments {}", remainingRequired.get(), counts);
     streamsRemaining.set(remainingRequired.get());
     fragCounts = ImmutableMap.copyOf(counts);
   }
@@ -53,11 +58,13 @@ public class IncomingBuffers {
     if(batch.getHeader().getIsLastBatch()){
       streamsRemaining.decrementAndGet();
     }
+    int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
+    BatchCollector fSet = fragCounts.get(sendMajorFragmentId);
+    if (fSet == null) throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting.  The id was %d.", sendMajorFragmentId));
+    boolean decremented = fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
     
-    BatchCollector fSet = fragCounts.get(batch.getHeader().getSendingMajorFragmentId());
-    if (fSet == null) throw new FragmentSetupException("We received a major fragment id that we were not expecting.");
-    fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
-    return remainingRequired.get() == 0;
+    // we should only return true if remaining required has been decremented and is currently equal to zero.
+    return decremented && remainingRequired.get() == 0;
   }
 
   public int getRemainingRequired() {
@@ -75,7 +82,7 @@ public class IncomingBuffers {
    * Designed to setup initial values for arriving fragment accounting.
    */
   public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, BatchCollector>, RuntimeException> {
-
+    
     @Override
     public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
       BatchCollector set;
@@ -84,7 +91,7 @@ public class IncomingBuffers {
       } else {
         set = new PartitionedCollector(remainingRequired, receiver);
       }
-
+      
       counts.put(set.getOppositeMajorFragmentId(), set);
       remainingRequired.incrementAndGet();
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
index e21d69a..93868a7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -27,7 +27,7 @@ public class MergingCollector extends AbstractFragmentCollector{
   
   public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
     super(parentAccounter, receiver, 1);
-    streamsRunning = new AtomicInteger(parentAccounter.get());
+    streamsRunning = new AtomicInteger(receiver.getProvidingEndpoints().size());
   }
 
   @Override
@@ -35,10 +35,11 @@ public class MergingCollector extends AbstractFragmentCollector{
     return buffers[0];
   }
 
-  @Override
+  
   public void streamFinished(int minorFragmentId) {
     if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
   }
+
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
index 116ca26..25b5884 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -36,6 +36,7 @@ public class PartitionedCollector extends AbstractFragmentCollector{
   public void streamFinished(int minorFragmentId) {
     buffers[minorFragmentId].finished();
   }
+
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
index f97d878..71ae576 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
@@ -65,7 +65,7 @@ public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
       }
     }
     
-    return null;
+    return b;
     
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index dea8282..f86c4fb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -24,12 +24,9 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.ImplCreator;
-import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
@@ -45,8 +42,8 @@ import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.util.AtomicState;
@@ -126,14 +123,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   
   void cleanupAndSendResult(QueryResult result){
     bee.retireForeman(this);
-    initiatingClient.sendResult(new QueryWritableBatch(result)).addLightListener(new ResponseSendListener());
+    initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result));
   }
 
-  private class ResponseSendListener extends RpcOutcomeListener<Ack> {
+  private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
     @Override
     public void failed(RpcException ex) {
-      logger
-          .info(
+      logger.info(
               "Failure while trying communicate query result to initating client.  This would happen if a client is disconnected before response notice can be sent.",
               ex);
     }
@@ -193,12 +189,17 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       fail("Failure while fragmenting query.", e);
       return;
     }
+    
+    
+
+    
     PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
     SimpleParallelizer parallelizer = new SimpleParallelizer();
 
     try {
       QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10);
 
+      this.context.getBitCom().getListeners().addFragmentStatusListener(work.getRootFragment().getHandle(), fragmentManager);
       List<PlanFragment> leafFragments = Lists.newArrayList();
 
       // store fragments in distributed grid.
@@ -213,7 +214,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments);
 
     
-    } catch (ExecutionSetupException e) {
+    } catch (ExecutionSetupException | RpcException e) {
       fail("Failure while setting up query.", e);
     }
 
@@ -245,9 +246,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     return this.state.getState();
   }
 
-  public boolean rootCoorespondsTo(FragmentHandle handle){
-    throw new UnsupportedOperationException();
-  }
   
   class ForemanManagerListener{
     void fail(String message, Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 20797b8..f069db7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -64,12 +64,13 @@ class RunningFragmentManager implements FragmentStatusListener{
     this.foreman = foreman;
     this.tun = tun;
     this.remainingFragmentCount = new AtomicInteger(0);
+    
   }
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
     remainingFragmentCount.set(leafFragments.size()+1);
 
-    // set up the root framgnet first so we'll have incoming buffers available.
+    // set up the root fragment first so we'll have incoming buffers available.
     {
       IncomingBuffers buffers = new IncomingBuffers(rootOperator);
       
@@ -97,13 +98,13 @@ class RunningFragmentManager implements FragmentStatusListener{
   private void sendRemoteFragment(PlanFragment fragment){
     map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
     FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
-    tun.get(fragment.getAssignment()).sendFragment(fragment).addLightListener(listener);
+    tun.get(fragment.getAssignment()).sendFragment(listener, fragment);
   }
   
   
   @Override
   public void statusUpdate(FragmentStatus status) {
-    
+    logger.debug("New fragment status was provided to Foreman of {}", status);
     switch(status.getState()){
     case AWAITING_ALLOCATION:
       updateStatus(status);
@@ -205,6 +206,7 @@ class RunningFragmentManager implements FragmentStatusListener{
 
     @Override
     public void failed(RpcException ex) {
+      logger.debug("Failure while sending fragment.  Stopping query.", ex);
       stopQuery();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
index b4e9308..b23f003 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
@@ -44,6 +44,6 @@ public interface IncomingFragmentHandler {
   public abstract FragmentRunner getRunnable();
 
   public abstract void cancel();
-  public boolean isDone();
+  public boolean isWaiting();
   public abstract FragmentHandle getHandle();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
index 3f710ed..5ffd09a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
@@ -60,8 +60,8 @@ public class LocalFragmentHandler implements IncomingFragmentHandler{
   }
 
   @Override
-  public boolean isDone() {
-    return cancel || isDone();
+  public boolean isWaiting() {
+    return !buffers.isDone() && !cancel;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index 70d7e93..4a5dbf2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -113,8 +113,8 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
   }
 
   @Override
-  public boolean isDone() {
-    return cancel || buffers.isDone();
+  public boolean isWaiting() {
+    return !buffers.isDone() && !cancel;
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
index 7c6bfe5..586ccf6 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import static org.junit.Assert.*;
+
 import java.util.List;
 
 import org.apache.drill.common.util.FileUtils;
@@ -26,26 +28,29 @@ import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
-@Ignore
+//@Ignore
 public class DistributedFragmentRun extends PopUnitTestBase{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedFragmentRun.class);
   
   
   @Test 
-  public void simpleDistributedQuery() throws Exception{
+  public void oneBitOneExchangeRun() throws Exception{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
       bit1.run();
-      bit2.run();
       client.connect();
       List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange.json"), Charsets.UTF_8));
-      System.out.println(results);
+      int count = 0;
+      for(QueryResultBatch b : results){
+        count += b.getHeader().getRowCount();
+      }
+      assertEquals(100, count);
     }
     
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
index 7b7ab8e..1e0c5b6 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
@@ -23,28 +23,12 @@ import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
 
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.PhysicalPlan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.fragment.Fragment;
-import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
-import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
-import org.apache.drill.exec.planner.SimpleExecPlanner;
+import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.QueryWorkUnit;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
 public class CheckFragmenter extends PopUnitTestBase {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CheckFragmenter.class);
 
@@ -77,10 +61,11 @@ public class CheckFragmenter extends PopUnitTestBase {
     assertNotNull(b.getSendingExchange());
   }
 
-  
 
 
   
+
+  
   
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
index 6f229a3..e1db639 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
@@ -17,16 +17,15 @@
  ******************************************************************************/
 package org.apache.drill.exec.pop;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -41,26 +40,46 @@ public class FragmentChecker extends PopUnitTestBase{
   
   @Test
   public void checkSimpleExchangePlan() throws Exception{
+    print("/physical_simpleexchange.json", 2, 3);
+
+  }
+  
+  
+  private void print(String fragmentFile, int bitCount, int exepectedFragmentCount) throws Exception{
     
+    System.out.println(String.format("=================Building plan fragments for [%s].  Allowing %d total Drillbits.==================", fragmentFile, bitCount));
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
-    Fragment fragmentRoot = getRootFragment(ppr, "/physical_simpleexchange.json");
+    Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
     PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
     SimpleParallelizer par = new SimpleParallelizer();
+    List<DrillbitEndpoint> endpoints = Lists.newArrayList();
+    DrillbitEndpoint localBit = null;
+    for(int i =0; i < bitCount; i++){
+      DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234+i).build();
+      if(i ==0) localBit = b1; 
+      endpoints.add(b1);
+    }
     
-    DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234).build();
-    DrillbitEndpoint b2 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(2345).build();
     
-    QueryWorkUnit qwu = par.getFragments(b1, QueryId.getDefaultInstance(), Lists.newArrayList(b1, b2), ppr, fragmentRoot, planningSet, 10);
-    assertEquals(qwu.getFragments().size(), 3);
-    System.out.println("=========ROOT FRAGMENT=========");
+    QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10);
+    System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
+    
     System.out.print(qwu.getRootFragment().getFragmentJson());
     
     
     for(PlanFragment f : qwu.getFragments()){
-      System.out.println("=========");
+      System.out.println(String.format("=========Fragment [%d:%d]=====", f.getHandle().getMajorFragmentId(), f.getHandle().getMinorFragmentId()));
       System.out.print(f.getFragmentJson());
     }
+    
+    //assertEquals(exepectedFragmentCount, qwu.getFragments().size());
+
     logger.debug("Planning Set {}", planningSet);
+  }
+  
+  @Test
+  public void validateSingleExchangeFragment() throws Exception{
+    print("/physical_single_exchange.json", 1, 2);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
index 9684e9f..038b093 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -19,27 +19,25 @@ package org.apache.drill.exec.server;
 
 import io.netty.buffer.ByteBuf;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.bit.BitClient;
-import org.apache.drill.exec.rpc.bit.BitComImpl;
 import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager;
 import org.apache.drill.exec.rpc.bit.BitRpcConfig;
 import org.apache.drill.exec.rpc.bit.BitServer;
+import org.apache.drill.exec.rpc.bit.BitTunnel.SendFragmentStatus;
+import org.apache.drill.exec.rpc.bit.ConnectionManagerRegistry;
 import org.apache.drill.exec.rpc.bit.ListenerPool;
 import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 import org.junit.Test;
 
-import com.google.common.collect.Maps;
-
 public class TestBitRpc {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
   
@@ -47,13 +45,19 @@ public class TestBitRpc {
   public void testBasicConnectionAndHandshake() throws Exception{
     int port = 1234;
     BootStrapContext c = new BootStrapContext(DrillConfig.create());
-    ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
-    BitServer server = new BitServer(new BitComTestHandler(), c, registry, new ListenerPool(2));
+    final BitComTestHandler handler = new BitComTestHandler();
+    final ListenerPool listeners = new ListenerPool(2);
+    ConnectionManagerRegistry registry = new ConnectionManagerRegistry(handler, c, listeners);
+    BitServer server = new BitServer(handler, c, registry, listeners);
     port = server.bind(port);
+    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build();
+    registry.setEndpoint(ep);
     for(int i =0; i < 10; i++){
-      BitClient client = new BitClient(DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build(), null, new BitComTestHandler(), c, registry, new ListenerPool(2));
-      client.connect();
-      
+      try(BitConnectionManager cm = new BitConnectionManager(ep, ep, handler, c, listeners)){
+        SendFragmentStatus cmd = new SendFragmentStatus(FragmentStatus.getDefaultInstance());
+        cm.runCommand(cmd);
+        cmd.getFuture().checkedGet();
+      }
     }
     System.out.println("connected");
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
index 675ecfb..0e1921e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
@@ -17,7 +17,6 @@
             	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
             	  {name: "green", type: "INT", mode: "REQUIRED"}
             	]}
-            	
             ]
         },
         {


[2/3] Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
index 4ba99a1..82a6aa6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
@@ -22,57 +22,54 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.rpc.BasicClient;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 
 import com.google.protobuf.MessageLite;
 
-public class BitClient  extends BasicClient<RpcType, BitConnection>{
+public class BitClient  extends BasicClient<RpcType, BitConnection, BitHandshake, BitHandshake>{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitClient.class);
 
   private final BitComHandler handler;
-  private final DrillbitEndpoint endpoint;
-  private BitConnection connection;
-  private final AvailabilityListener openListener;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
+  private final DrillbitEndpoint remoteEndpoint;
+  private volatile BitConnection connection;
   private final ListenerPool listeners;
+  private final CloseHandlerCreator closeHandlerFactory;
+  private final DrillbitEndpoint localIdentity;
   
-  public BitClient(DrillbitEndpoint endpoint, AvailabilityListener openListener, BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
-    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
-
-    this.endpoint = endpoint;
+  public BitClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, BitComHandler handler, BootStrapContext context, CloseHandlerCreator closeHandlerFactory, ListenerPool listeners) {
+    super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER);
+    this.localIdentity = localEndpoint;
+    this.remoteEndpoint = remoteEndpoint;
     this.handler = handler;
-    this.openListener = openListener;
-    this.registry = registry;
     this.listeners = listeners;
+    this.closeHandlerFactory = closeHandlerFactory;
   }
   
-  public BitHandshake connect() throws RpcException, InterruptedException{
-    BitHandshake bs = connectAsClient(RpcType.HANDSHAKE, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getBitPort(), BitHandshake.class);
-    connection.setEndpoint(endpoint);
-    return bs;
+  public void connect(RpcConnectionHandler<BitConnection> connectionHandler) {
+    connectAsClient(connectionHandler, BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).setEndpoint(localIdentity).build(), remoteEndpoint.getAddress(), remoteEndpoint.getBitPort());
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public BitConnection initRemoteConnection(Channel channel) {
-    this.connection = new BitConnection(openListener, channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, registry, listeners);
+    this.connection = new BitConnection(channel, (RpcBus<RpcType, BitConnection>) (RpcBus<?, ?>) this, listeners);
     return connection;
   }
 
   @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection clientConnection) {
-    return clientConnection.getCloseHandler(super.getCloseHandler(clientConnection));
+    return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(clientConnection));
   }
 
   @Override
@@ -86,18 +83,15 @@ public class BitClient  extends BasicClient<RpcType, BitConnection>{
   }
 
   @Override
-  protected ClientHandshakeHandler<BitHandshake> getHandshakeHandler() {
-    return new ClientHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.class, BitHandshake.PARSER){
-
-      @Override
-      protected void validateHandshake(BitHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from bit server to bit client. {}", inbound);
-        if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
-      }
+  protected void validateHandshake(BitHandshake handshake) throws RpcException {
+    if(handshake.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", handshake.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+  }
 
-    };
+  @Override
+  protected void finalizeConnection(BitHandshake handshake, BitConnection connection) {
+    connection.setEndpoint(handshake.getEndpoint());
   }
-  
+
   public BitConnection getConnection(){
     return this.connection;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
index c60d36b..f7f508e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCom.java
@@ -40,11 +40,17 @@ public interface BitCom extends Closeable {
    */
   public BitTunnel getTunnel(DrillbitEndpoint node) ;
 
-  public int start() throws InterruptedException, DrillbitStartupException;
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException;
 
   /**
    * Register an incoming batch handler for a local foreman.  
    * @param handler
    */
   public void registerIncomingBatchHandler(IncomingFragmentHandler handler);
+  
+  /**
+   * Get ListenerPool
+   * @return
+   */
+  public ListenerPool getListeners();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
index c98be44..d1cadc7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -18,157 +18,68 @@
 package org.apache.drill.exec.rpc.bit;
 
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * Manages communication tunnels between nodes.   
+ * Manages communication tunnels between nodes.
  */
 public class BitComImpl implements BitCom {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
 
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
   private final ListenerPool listeners;
   private volatile BitServer server;
   private final BitComHandler handler;
   private final BootStrapContext context;
-  
-  // TODO: this executor should be removed.
-  private final Executor exec = Executors.newCachedThreadPool(new NamedThreadFactory("BitComImpl execution pool: "));
+  private final ConnectionManagerRegistry connectionRegistry;
 
   public BitComImpl(BootStrapContext context, BitComHandler handler) {
     super();
     this.handler = handler;
     this.context = context;
     this.listeners = new ListenerPool(8);
+    this.connectionRegistry = new ConnectionManagerRegistry(handler, context, listeners);
   }
 
-  public int start() throws InterruptedException, DrillbitStartupException {
-    server = new BitServer(handler, context, registry, listeners);
+  @Override
+  public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+    server = new BitServer(handler, context, connectionRegistry, listeners);
     int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
-    return server.bind(port);
-  }
-
-  private CheckedFuture<BitConnection, RpcException> getNode(final DrillbitEndpoint endpoint, boolean check) {
-    
-    
-    SettableFuture<BitConnection> future = SettableFuture.create();
-    BitComFuture<BitConnection> checkedFuture = new BitComFuture<BitConnection>(future);
-    BitConnection t = null;
-
-    if (check) {
-      t = registry.get(endpoint);
-
-      if (t != null) {
-        future.set(t);
-        return checkedFuture;
-      }
-    }
-    
-    try {
-      AvailWatcher watcher = new AvailWatcher(future);
-      BitClient c = new BitClient(endpoint, watcher, handler, context, registry, listeners);
-      c.connect();
-      return checkedFuture;
-    } catch (InterruptedException | RpcException e) {
-      future.setException(new FragmentSetupException("Unable to open connection"));
-      return checkedFuture;
-    }
-
+    port = server.bind(port);
+    DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setBitPort(port).build();
+    connectionRegistry.setEndpoint(completeEndpoint);
+    return completeEndpoint;
   }
 
-  private class AvailWatcher implements AvailabilityListener{
-    final SettableFuture<BitConnection> future;
-    
-    public AvailWatcher(SettableFuture<BitConnection> future) {
-      super();
-      this.future = future;
-    }
-
-    @Override
-    public void isAvailable(BitConnection connection) {
-      future.set(connection);
-    }
-    
-  }
   
-  BitConnection getConnection(DrillbitEndpoint endpoint) throws RpcException {
-    BitConnection t = registry.get(endpoint);
-    if(t != null) return t;
-    return this.getNode(endpoint, false).checkedGet();
+   
+  public ListenerPool getListeners() {
+    return listeners;
   }
 
-  
-  CheckedFuture<BitConnection, RpcException> getConnectionAsync(DrillbitEndpoint endpoint) {
-    return this.getNode(endpoint, true);
-  }
-
-  
   @Override
-  public BitTunnel getTunnel(DrillbitEndpoint endpoint){
-    BitConnection t = registry.get(endpoint);
-    if(t == null){
-      return new BitTunnel(exec, endpoint, this, t);
-    }else{
-      return new BitTunnel(exec, endpoint, this,  this.getNode(endpoint, false));
-    }
+  public BitTunnel getTunnel(DrillbitEndpoint endpoint) {
+    return new BitTunnel(endpoint, connectionRegistry.getConnectionManager(endpoint));
   }
 
-
-  /**
-   * A future which remaps exceptions to a BitComException.
-   * @param <T>
-   */
-  private class BitComFuture<T> extends AbstractCheckedFuture<T, RpcException>{
-
-    protected BitComFuture(ListenableFuture<T> delegate) {
-      super(delegate);
-    }
-
-    @Override
-    protected RpcException mapException(Exception e) {
-      Throwable t = e;
-      if(e instanceof ExecutionException){
-        t = e.getCause();
-      }
-      
-      if(t instanceof RpcException) return (RpcException) t;
-      return new RpcException(t);
-    }
+  @Override
+  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
+    this.handler.registerIncomingFragmentHandler(handler);
   }
 
   public void close() {
     Closeables.closeQuietly(server);
-    for (BitConnection bt : registry.values()) {
-      bt.shutdownIfClient();
+    for (BitConnectionManager bt : connectionRegistry) {
+      bt.close();
     }
   }
 
-  @Override
-  public void registerIncomingBatchHandler(IncomingFragmentHandler handler) {
-    this.handler.registerIncomingFragmentHandler(handler);
-  }
-  
-  
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
new file mode 100644
index 0000000..692c63e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitCommand.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+
+import com.google.protobuf.MessageLite;
+
+public interface BitCommand<T extends MessageLite> extends RpcConnectionHandler<BitConnection>{
+
+  public abstract void connectionAvailable(BitConnection connection);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
index 73980f9..f85ea74 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
@@ -35,31 +36,35 @@ import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
+import com.google.protobuf.MessageLite;
 
 public class BitConnection extends RemoteConnection{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnection.class); 
   
   private final RpcBus<RpcType, BitConnection> bus;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
   private final ListenerPool listeners;
-
-  private final AvailabilityListener listener;
   private volatile DrillbitEndpoint endpoint;
   private volatile boolean active = false;
   private final UUID id;
   
-  public BitConnection(AvailabilityListener listener, Channel channel, RpcBus<RpcType, BitConnection> bus, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners){
+  public BitConnection(Channel channel, RpcBus<RpcType, BitConnection> bus, ListenerPool listeners){
     super(channel);
     this.bus = bus;
-    this.registry = registry;
     // we use a local listener pool unless a global one is provided.
     this.listeners = listeners != null ? listeners : new ListenerPool(2);
-    this.listener = listener;
     this.id = UUID.randomUUID();
   }
+  
+  void setEndpoint(DrillbitEndpoint endpoint){
+    assert this.endpoint == null : "Endpoint should only be set once (only in the case in incoming server requests).";
+    this.endpoint = endpoint;
+    active = true;
+  }
 
   protected DrillbitEndpoint getEndpoint() {
     return endpoint;
@@ -69,48 +74,12 @@ public class BitConnection extends RemoteConnection{
     return listeners;
   }
   
-  protected void setEndpoint(DrillbitEndpoint endpoint) {
-    Preconditions.checkNotNull(endpoint);
-    Preconditions.checkArgument(this.endpoint == null);
-    
-    this.endpoint = endpoint;
-    BitServer.logger.debug("Adding new endpoint to available BitServer connections.  Endpoint: {}.", endpoint);
-    synchronized(this){
-      BitConnection c = registry.putIfAbsent(endpoint, this);
-      
-      if(c != null){ // the registry already has a connection like this
-        
-        // give the awaiting future an alternative connection.
-        if(listener != null){
-          listener.isAvailable(c);
-        }
-        
-        // shut this down if this is a client as it won't be available in the registry.
-        // otherwise we'll leave as, possibly allowing to bit coms to use different tunnels to talk to each other.  This shouldn't cause a problem.
-        logger.debug("Shutting down connection to {} since the registry already has an active connection that endpoint.", endpoint);
-        shutdownIfClient();
-        
-      }
-      active = true;
-      if(listener != null) listener.isAvailable(this);
-    }
-  }
-
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch){
-    return bus.send(this, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
-  }
-  
-  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment){
-    return bus.send(this, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
-  }
   
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
-    return bus.send(this,  RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies){
+    bus.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
   }
   
-  public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
-    return bus.send(this,  RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
-  }
 
   public void disable(){
     active = false;
@@ -140,27 +109,7 @@ public class BitConnection extends RemoteConnection{
     return true;
   }
 
-  public GenericFutureListener<ChannelFuture> getCloseHandler(GenericFutureListener<ChannelFuture> parent){
-    return new CloseHandler(this, parent);
-  }
-  
-  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
-    private BitConnection connection;
-    private GenericFutureListener<ChannelFuture> parent;
-    
-    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
-      super();
-      this.connection = connection;
-      this.parent = parent;
-    }
 
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if(connection.getEndpoint() != null) registry.remove(connection.getEndpoint(), connection);
-      parent.operationComplete(future);
-    }
-    
-  }
   
   public void shutdownIfClient(){
     if(bus.isClient()) Closeables.closeQuietly(bus);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
index 0160d24..d99bb22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
@@ -17,58 +17,152 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import java.io.Closeable;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
 
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.protobuf.MessageLite;
 
-public class BitConnectionManager {
+/**
+ * Manager all connections between two particular bits.
+ */
+public class BitConnectionManager implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitConnectionManager.class);
   
-  private final int maxAttempts;
-  private final BitComImpl com;
   private final DrillbitEndpoint endpoint;
-  private final AtomicReference<BitConnection> connection;
-  private final AtomicReference<CheckedFuture<BitConnection, RpcException>> future;
+  private final AtomicReference<BitConnection> connectionHolder;
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  private final ListenerPool listenerPool;
+  private final DrillbitEndpoint localIdentity;
+  
+  public BitConnectionManager(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localIdentity, BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+    assert remoteEndpoint != null : "Endpoint cannot be null.";
+    assert remoteEndpoint.getAddress() != null && !remoteEndpoint.getAddress().isEmpty(): "Endpoint address cannot be null.";
+    assert remoteEndpoint.getBitPort() > 0 : String.format("Bit Port must be set to a port between 1 and 65k.  Was set to %d.", remoteEndpoint.getBitPort());
+    
+    this.connectionHolder =  new AtomicReference<BitConnection>();
+    this.endpoint = remoteEndpoint;
+    this.localIdentity = localIdentity;
+    this.handler = handler;
+    this.context = context;
+    this.listenerPool = listenerPool;
+  }
+  
+  public <R extends MessageLite> BitCommand<R> runCommand(BitCommand<R> cmd){
+    logger.debug("Running command {}", cmd);
+    BitConnection connection = connectionHolder.get();
+    if(connection != null){
+      if(connection.isActive()){
+        cmd.connectionAvailable(connection);
+        return cmd;
+      }else{
+        // remove the old connection. (don't worry if we fail since someone else should have done it.
+        connectionHolder.compareAndSet(connection, null);
+      }
+    }
+    
+    /** We've arrived here without a connection, let's make sure only one of us makes a connection. (fyi, another endpoint could create a reverse connection **/
+    synchronized(this){
+      connection = connectionHolder.get();
+      if(connection != null){
+        cmd.connectionAvailable(connection);
+      }else{
+        BitClient client = new BitClient(endpoint, localIdentity, handler, context, new CloseHandlerCreator(), listenerPool);
+        
+        client.connect(new ConnectionListeningDecorator(cmd, !endpoint.equals(localIdentity)));
+      }
+      return cmd;
+      
+    }
+  }
+  
+  CloseHandlerCreator getCloseHandlerCreator(){
+    return new CloseHandlerCreator();
+  }
 
-  BitConnectionManager(DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection, CheckedFuture<BitConnection, RpcException> future, int maxAttempts) {
-    assert endpoint != null && endpoint.getAddress() != null && endpoint.getBitPort() > 0;
-    this.com = com;
-    this.connection =  new AtomicReference<BitConnection>(connection);
-    this.future = new AtomicReference<CheckedFuture<BitConnection, RpcException>>(future);
-    this.endpoint = endpoint;
-    this.maxAttempts = maxAttempts;
+  /** Factory for close handlers **/
+  class CloseHandlerCreator{
+    public GenericFutureListener<ChannelFuture> getHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent){
+      return new CloseHandler(connection, parent);
+    }
   }
   
-  BitConnection getConnection(int attempt) throws RpcException{
-    BitConnection con = connection.get();
+  
+  
+  /**
+   * Listens for connection closes and clears connection holder.
+   */
+  private class CloseHandler implements GenericFutureListener<ChannelFuture>{
+    private BitConnection connection;
+    private GenericFutureListener<ChannelFuture> parent;
     
-    if(con != null){
-      if(con.isActive()) return con;
-      connection.compareAndSet(con, null);
+    public CloseHandler(BitConnection connection, GenericFutureListener<ChannelFuture> parent) {
+      super();
+      this.connection = connection;
+      this.parent = parent;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      connectionHolder.compareAndSet(connection, null);
+      parent.operationComplete(future);
     }
     
-    CheckedFuture<BitConnection, RpcException> fut = future.get();
+  } 
+  
+  /**
+   * Decorate a connection creation so that we capture a success and keep it available for future requests.  If we have raced and another is already available... we return that one and close things down on this one.
+   */
+  private class ConnectionListeningDecorator implements RpcConnectionHandler<BitConnection>{
+
+    private final RpcConnectionHandler<BitConnection> delegate;
+    private final boolean closeOnDupe;
+    
+    public ConnectionListeningDecorator(RpcConnectionHandler<BitConnection> delegate,  boolean closeOnDupe) {
+      this.delegate = delegate;
+      this.closeOnDupe = closeOnDupe;
+    }
 
-    if(fut != null){
-      try{
-        return fut.checkedGet();
-      }catch(RpcException ex){
-        future.compareAndSet(fut, null);
-        if(attempt < maxAttempts){
-          return getConnection(attempt + 1);
-        }else{
-          throw ex;
+    @Override
+    public void connectionSucceeded(BitConnection incoming) {
+      BitConnection connection = connectionHolder.get();
+      while(true){
+        boolean setted = connectionHolder.compareAndSet(null, incoming);
+        if(setted){
+          connection = incoming;
+          break;
         }
+        connection = connectionHolder.get();
+        if(connection != null) break; 
+      }
+      
+      
+      if(connection == incoming){
+        delegate.connectionSucceeded(connection);
+      }else{
+
+        if(closeOnDupe){
+          // close the incoming because another channel was created in the mean time (unless this is a self connection).
+          logger.debug("Closing incoming connection because a connection was already set.");
+          incoming.getChannel().close();
+        }
+        delegate.connectionSucceeded(connection);
       }
     }
-    
-    // no checked future, let's make one.
-    fut = com.getConnectionAsync(endpoint);
-    future.compareAndSet(null, fut);
-    return getConnection(attempt);
+
+    @Override
+    public void connectionFailed(org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType type, Throwable t) {
+      delegate.connectionFailed(type, t);
+    }
     
   }
 
@@ -76,5 +170,20 @@ public class BitConnectionManager {
     return endpoint;
   }
   
+  public void addServerConnection(BitConnection connection){
+    // if the connection holder is not set, set it to this incoming connection.
+    logger.debug("Setting server connection.");
+    this.connectionHolder.compareAndSet(null, connection);
+  }
+
+  @Override
+  public void close() {
+    BitConnection c = connectionHolder.getAndSet(null);
+    if(c != null){
+      c.getChannel().close();
+    }
+  }
+  
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
index 88ac6cc..d4665a8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -22,18 +22,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
-import org.apache.drill.exec.proto.ExecProtos.BitStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.RpcType;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.rpc.BasicServer;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnectionManager.CloseHandlerCreator;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.work.batch.BitComHandler;
 
@@ -43,13 +38,14 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
   
   private final BitComHandler handler;
-  private final ConcurrentMap<DrillbitEndpoint, BitConnection> registry;
   private final ListenerPool listeners;
+  private final ConnectionManagerRegistry connectionRegistry;
+  private volatile ProxyCloseHandler proxyCloseHandler;
   
-  public BitServer(BitComHandler handler, BootStrapContext context, ConcurrentMap<DrillbitEndpoint, BitConnection> registry, ListenerPool listeners) {
+  public BitServer(BitComHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry, ListenerPool listeners) {
     super(BitRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
     this.handler = handler;
-    this.registry = registry;
+    this.connectionRegistry = connectionRegistry;
     this.listeners = listeners;
   }
   
@@ -65,23 +61,36 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
 
   @Override
   protected GenericFutureListener<ChannelFuture> getCloseHandler(BitConnection connection) {
-    return connection.getCloseHandler(super.getCloseHandler(connection));
+    this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(connection));
+    return proxyCloseHandler;
   }
 
   @Override
   public BitConnection initRemoteConnection(Channel channel) {
-    return new BitConnection(null, channel, this, registry, listeners);
+    return new BitConnection(channel, this, listeners);
   }
   
   
   @Override
-  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler() {
+  protected ServerHandshakeHandler<BitHandshake> getHandshakeHandler(final BitConnection connection) {
     return new ServerHandshakeHandler<BitHandshake>(RpcType.HANDSHAKE, BitHandshake.PARSER){
       
       @Override
       public MessageLite getHandshakeResponse(BitHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from other bit. {}", inbound);
+//        logger.debug("Handling handshake from other bit. {}", inbound);
         if(inbound.getRpcVersion() != BitRpcConfig.RPC_VERSION) throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), BitRpcConfig.RPC_VERSION));
+        if(!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getBitPort() < 1) throw new RpcException(String.format("RPC didn't provide valid counter endpoint information.  Received %s.", inbound.getEndpoint()));
+        connection.setEndpoint(inbound.getEndpoint());
+
+        // add the 
+        BitConnectionManager manager = connectionRegistry.getConnectionManager(inbound.getEndpoint());
+        
+        // update the close handler.
+        proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, proxyCloseHandler.getHandler()));
+        
+        // add to the connection manager. 
+        manager.addServerConnection(connection);
+
         return BitHandshake.newBuilder().setRpcVersion(BitRpcConfig.RPC_VERSION).build();
       }
 
@@ -89,5 +98,30 @@ public class BitServer extends BasicServer<RpcType, BitConnection>{
   }
 
 
+  private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
+
+    private volatile GenericFutureListener<ChannelFuture>  handler;
+    
+    public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) {
+      super();
+      this.handler = handler;
+    }
+
+
+    public GenericFutureListener<ChannelFuture> getHandler() {
+      return handler;
+    }
+
+
+    public void setHandler(GenericFutureListener<ChannelFuture> handler) {
+      this.handler = handler;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      handler.operationComplete(future);
+    }
+    
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
index 652fa52..83b7959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -17,95 +17,79 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 
-/**
- * Interface provided for communication between two bits. Underlying connection may be server or client based. Resilient
- * to connection loss. Right now, this has to jump through some hoops and bridge futures between the connection creation
- * and action. A better approach should be done.
- */
 public class BitTunnel {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
 
-  private static final int MAX_ATTEMPTS = 3;
-
   private final BitConnectionManager manager;
-  private final Executor exec;
-  
+  private final DrillbitEndpoint endpoint;
 
-  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com, BitConnection connection) {
-    this.manager = new BitConnectionManager(endpoint, com, connection, null, MAX_ATTEMPTS);
-    this.exec = exec;
-  }
-
-  public BitTunnel(Executor exec, DrillbitEndpoint endpoint, BitComImpl com,
-      CheckedFuture<BitConnection, RpcException> future) {
-    this.manager = new BitConnectionManager(endpoint, com, (BitConnection) null, future, MAX_ATTEMPTS);
-    this.exec = exec;
+  public BitTunnel(DrillbitEndpoint endpoint, BitConnectionManager manager) {
+    this.manager = manager;
+    this.endpoint = endpoint;
   }
   
   public DrillbitEndpoint getEndpoint(){
     return manager.getEndpoint();
   }
 
-  private <T> DrillRpcFuture<T> submit(BitCommand<T> command) {
-    exec.execute(command);
-    return command;
-  }
-
-  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) {
-    return submit(new SendBatch(batch, context));
+  public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentContext context, FragmentWritableBatch batch) {
+    SendBatch b = new SendBatch(outcomeListener, batch, context);
+    manager.runCommand(b);
   }
 
-  public DrillRpcFuture<Ack> sendFragment(PlanFragment fragment) {
-    return submit(new SendFragment(fragment));
+  public void sendFragment(RpcOutcomeListener<Ack> outcomeListener, PlanFragment fragment){
+    SendFragment b = new SendFragment(outcomeListener, fragment);
+    manager.runCommand(b);
   }
-
-  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle) {
-    return submit(new CancelFragment(handle));
+  
+  public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
+    CancelFragment b = new CancelFragment(handle);
+    manager.runCommand(b);
+    return b.getFuture();
   }
-
+  
   public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
-    return submit(new SendFragmentStatus(status));
+    SendFragmentStatus b = new SendFragmentStatus(status);
+    manager.runCommand(b);
+    return b.getFuture();
   }
 
-  public class SendBatch extends BitCommand<Ack> {
+  public static class SendBatch extends ListeningBitCommand<Ack> {
     final FragmentWritableBatch batch;
     final FragmentContext context;
 
-    public SendBatch(FragmentWritableBatch batch, FragmentContext context) {
-      super();
+    public SendBatch(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch, FragmentContext context) {
+      super(listener);
       this.batch = batch;
       this.context = context;
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      logger.debug("Sending record batch. {}", batch);
-      return connection.sendRecordBatch(context, batch);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
     }
 
+    @Override
+    public String toString() {
+      return "SendBatch [batch.header=" + batch.getHeader() + "]";
+    }
+    
+    
   }
 
-  public class SendFragmentStatus extends BitCommand<Ack> {
+  public static class SendFragmentStatus extends FutureBitCommand<Ack> {
     final FragmentStatus status;
 
     public SendFragmentStatus(FragmentStatus status) {
@@ -114,12 +98,13 @@ public class BitTunnel {
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.sendFragmentStatus(status);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
     }
+
   }
 
-  public class CancelFragment extends BitCommand<Ack> {
+  public static class CancelFragment extends FutureBitCommand<Ack> {
     final FragmentHandle handle;
 
     public CancelFragment(FragmentHandle handle) {
@@ -128,109 +113,23 @@ public class BitTunnel {
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.cancelFragment(handle);
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle,  Ack.class);
     }
 
   }
 
-  public class SendFragment extends BitCommand<Ack> {
+  public static class SendFragment extends ListeningBitCommand<Ack> {
     final PlanFragment fragment;
 
-    public SendFragment(PlanFragment fragment) {
-      super();
+    public SendFragment(RpcOutcomeListener<Ack> listener, PlanFragment fragment) {
+      super(listener);
       this.fragment = fragment;
     }
 
     @Override
-    public CheckedFuture<Ack, RpcException> doRpcCall(BitConnection connection) {
-      return connection.sendFragment(fragment);
-    }
-
-  }
-
-
-  
-
-  private abstract class BitCommand<T> extends AbstractCheckedFuture<T, RpcException> implements Runnable, DrillRpcFuture<T> {
-
-    public void addLightListener(RpcOutcomeListener<T> outcomeListener){
-      this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
-    }
-
-    public BitCommand() {
-      super(SettableFuture.<T> create());
-    }
-
-    public abstract CheckedFuture<T, RpcException> doRpcCall(BitConnection connection);
-
-    public final void run() {
-      
-      try {
-        
-        BitConnection connection = manager.getConnection(0);
-        assert connection != null : "The connection manager should never return a null connection.  Worse case, it should throw an exception.";
-        CheckedFuture<T, RpcException> rpc = doRpcCall(connection);
-        rpc.addListener(new FutureBridge<T>((SettableFuture<T>) delegate(), rpc), MoreExecutors.sameThreadExecutor());
-      } catch (RpcException ex) {
-        ((SettableFuture<T>) delegate()).setException(ex);
-      }
-
-    }
-
-    @Override
-    protected RpcException mapException(Exception e) {
-      Throwable t = e;
-      if (e instanceof ExecutionException) {
-        t = e.getCause();
-      }
-      if (t instanceof RpcException) return (RpcException) t;
-      return new RpcException(t);
-    }
-
-    public class RpcOutcomeListenerWrapper implements Runnable{
-      final RpcOutcomeListener<T> inner;
-      
-      public RpcOutcomeListenerWrapper(RpcOutcomeListener<T> inner) {
-        this.inner = inner;
-      }
-
-      @Override
-      public void run() {
-        try{
-          inner.success(BitCommand.this.checkedGet());
-        }catch(RpcException e){
-          inner.failed(e);
-        }
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "BitCommand ["+this.getClass().getSimpleName()+"]";
-    }
-    
-    
-    
-  }
-
-  private class FutureBridge<T> implements Runnable {
-    final SettableFuture<T> out;
-    final CheckedFuture<T, RpcException> in;
-
-    public FutureBridge(SettableFuture<T> out, CheckedFuture<T, RpcException> in) {
-      super();
-      this.out = out;
-      this.in = in;
-    }
-
-    @Override
-    public void run() {
-      try {
-        out.set(in.checkedGet());
-      } catch (RpcException ex) {
-        out.setException(ex);
-      }
+    public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, BitConnection connection) {
+      connection.send(outcomeListener, RpcType.REQ_INIATILIZE_FRAGMENT, fragment, Ack.class);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
new file mode 100644
index 0000000..8afbc33
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ConnectionManagerRegistry.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+
+public class ConnectionManagerRegistry implements Iterable<BitConnectionManager>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
+  
+  private final ConcurrentMap<DrillbitEndpoint, BitConnectionManager> registry = Maps.newConcurrentMap();
+  
+  private final BitComHandler handler;
+  private final BootStrapContext context;
+  private final ListenerPool listenerPool;
+  private volatile DrillbitEndpoint localEndpoint;
+  
+  public ConnectionManagerRegistry(BitComHandler handler, BootStrapContext context, ListenerPool listenerPool) {
+    super();
+    this.handler = handler;
+    this.context = context;
+    this.listenerPool = listenerPool;
+  }
+
+  public BitConnectionManager getConnectionManager(DrillbitEndpoint endpoint){
+    assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved";
+    BitConnectionManager m = registry.get(endpoint);
+    if(m == null){
+      m = new BitConnectionManager(endpoint, localEndpoint, handler, context, listenerPool);
+      BitConnectionManager m2 = registry.putIfAbsent(endpoint, m);
+      if(m2 != null) m = m2;
+    }
+    
+    return m;
+  }
+
+  @Override
+  public Iterator<BitConnectionManager> iterator() {
+    return registry.values().iterator();
+  }
+  
+  public void setEndpoint(DrillbitEndpoint endpoint){
+    this.localEndpoint = endpoint;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
new file mode 100644
index 0000000..fa3b518
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class FutureBitCommand<T extends MessageLite> implements BitCommand<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FutureBitCommand.class);
+
+  protected final SettableFuture<T> settableFuture;
+  private final RpcCheckedFuture<T> parentFuture;
+
+  public FutureBitCommand() {
+    this.settableFuture = SettableFuture.create();
+    this.parentFuture = new RpcCheckedFuture<T>(settableFuture);
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+  @Override
+  public void connectionAvailable(BitConnection connection) {
+    
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(BitConnection connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      settableFuture.setException(ex);
+    }
+
+    @Override
+    public void success(T value) {
+      settableFuture.set(value);
+    }
+
+  }
+
+  public DrillRpcFuture<T> getFuture() {
+    return parentFuture;
+  }
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    settableFuture.setException(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
index 8f299d2..84dba85 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
@@ -22,32 +22,35 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.foreman.FragmentStatusListener;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 
 public class ListenerPool {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListenerPool.class);
   
-  private final ConcurrentMap<FragmentHandle, FragmentStatusListener> listeners;
+  private final ConcurrentMap<QueryId, FragmentStatusListener> listeners;
   
   public ListenerPool(int par){
-    listeners = new ConcurrentHashMap<FragmentHandle, FragmentStatusListener>(16, 0.75f, par);
+    listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(16, 0.75f, par);
   }
   
   public void removeFragmentStatusListener(FragmentHandle handle) throws RpcException{
+    logger.debug("Removing framgent status listener for handle {}.", handle);
     listeners.remove(handle);
   }
   
   public void addFragmentStatusListener(FragmentHandle handle, FragmentStatusListener listener) throws RpcException{
-    FragmentStatusListener old = listeners.putIfAbsent(handle, listener);
+    logger.debug("Adding framgent status listener for handle {}.", handle);
+    FragmentStatusListener old = listeners.putIfAbsent(handle.getQueryId(), listener);
     if(old != null) throw new RpcException("Failure.  The provided handle already exists in the listener pool.  You need to remove one listener before adding another.");
   }
   
   public void status(FragmentStatus status){
-    FragmentStatusListener l = listeners.get(status.getHandle());
+    FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
     if(l == null){
-      logger.info("A fragment message arrived but there was no registered listener for that message.");
+      
+      logger.error("A fragment message arrived but there was no registered listener for that message for handle {}.", status.getHandle());
       return;
     }else{
       l.statusUpdate(status);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
new file mode 100644
index 0000000..90db6a6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcCheckedFuture;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+
+public abstract class ListeningBitCommand<T extends MessageLite> implements BitCommand<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ListeningBitCommand.class);
+
+  private final RpcOutcomeListener<T> listener;
+
+  public ListeningBitCommand(RpcOutcomeListener<T> listener) {
+    this.listener = listener;
+  }
+
+  public abstract void doRpcCall(RpcOutcomeListener<T> outcomeListener, BitConnection connection);
+
+  @Override
+  public void connectionAvailable(BitConnection connection) {
+    
+    doRpcCall(new DeferredRpcOutcome(), connection);
+  }
+
+  @Override
+  public void connectionSucceeded(BitConnection connection) {
+    connectionAvailable(connection);
+  }
+
+  private class DeferredRpcOutcome implements RpcOutcomeListener<T> {
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.failed(ex);
+    }
+
+    @Override
+    public void success(T value) {
+      listener.success(value);
+    }
+
+  }
+
+
+  @Override
+  public void connectionFailed(FailureType type, Throwable t) {
+    listener.failed(RpcException.mapException(
+        String.format("Command failed while establishing connection.  Failure type %s.", type), t));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 3df88b7..779085c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -45,5 +45,12 @@ public class QueryResultBatch {
   public boolean hasData(){
     return data != null;
   }
+
+  @Override
+  public String toString() {
+    return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+  }
+  
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
new file mode 100644
index 0000000..0aa7c86
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+/**
+ * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is:
+ * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for
+ * query.
+ * 
+ * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo.
+ * 
+ */
+public class QueryResultHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+
+  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+
+  
+  public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){
+    return new SubmissionListener(listener);
+  }
+  
+  public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
+    final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+    UserResultsListener l = resultsListener.get(result.getQueryId());
+    // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
+    if (l != null) {
+      // logger.debug("Results listener available, using existing.");
+      l.resultArrived(batch);
+      if (result.getIsLastChunk()) {
+        resultsListener.remove(result.getQueryId(), l);
+      }
+    } else {
+      logger.debug("Results listener not available, creating a buffering listener.");
+      // manage race condition where we start getting results before we receive the queryid back.
+      BufferingListener bl = new BufferingListener();
+      l = resultsListener.putIfAbsent(result.getQueryId(), bl);
+      if (l != null) {
+        l.resultArrived(batch);
+      } else {
+        bl.resultArrived(batch);
+      }
+    }
+  }
+
+  private class BufferingListener implements UserResultsListener {
+
+    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private volatile UserResultsListener output;
+
+    public boolean transferTo(UserResultsListener l) {
+      synchronized (this) {
+        output = l;
+        boolean last = false;
+        for (QueryResultBatch r : results) {
+          l.resultArrived(r);
+          last = r.getHeader().getIsLastChunk();
+        }
+        return last;
+      }
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      synchronized (this) {
+        if (output == null) {
+          this.results.add(result);
+        } else {
+          output.resultArrived(result);
+        }
+      }
+    }
+
+    @Override
+    public void submissionFailed(RpcException ex) {
+      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+    }
+
+  }
+
+  private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> {
+    private UserResultsListener listener;
+
+    public SubmissionListener(UserResultsListener listener) {
+      super();
+      this.listener = listener;
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      listener.submissionFailed(ex);
+    }
+
+    @Override
+    public void success(QueryId queryId) {
+      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
+      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
+
+      // we need to deal with the situation where we already received results by the time we got the query id back. In
+      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
+      // results during the transition
+      if (oldListener != null) {
+        logger.debug("Unable to place user results listener, buffering listener was already in place.");
+        if (oldListener instanceof BufferingListener) {
+          resultsListener.remove(oldListener);
+          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
+          // simply remove the buffering listener if we already have the last response.
+          if (all) {
+            resultsListener.remove(oldListener);
+          } else {
+            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
+            if (!replaced) throw new IllegalStateException();
+          }
+        } else {
+          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
+        }
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 5d2e799..ad44ff2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -21,11 +21,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -36,115 +31,27 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicClientWithConnection;
 import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import com.google.protobuf.MessageLite;
 
-public class UserClient extends BasicClientWithConnection<RpcType> {
+public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
 
-  private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap();
+  private final QueryResultHandler queryResultHandler = new QueryResultHandler();
 
   public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup);
-  }
-
-  public Future<Void> submitQuery(RunQuery query, UserResultsListener resultsListener) throws RpcException {
-    this.send(RpcType.RUN_QUERY, query, QueryId.class).addLightListener(new SubmissionListener(resultsListener));
-    return resultsListener.getFuture();
+    super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
   }
 
-  public BitToUserHandshake connect(DrillbitEndpoint endpoint) throws RpcException, InterruptedException{
-    return this.connectAsClient(RpcType.HANDSHAKE, UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).build(), endpoint.getAddress(), endpoint.getUserPort(), BitToUserHandshake.class);
-  }
-  
-  private class BufferingListener extends UserResultsListener {
-
-    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
-    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private volatile UserResultsListener output;
-
-    public boolean transferTo(UserResultsListener l) {
-      lock.writeLock().lock();
-      output = l;
-      boolean last = false;
-      for (QueryResultBatch r : results) {
-        l.resultArrived(r);
-        last = r.getHeader().getIsLastChunk();
-      }
-      if (future.isDone()) {
-        l.set();
-      }
-      return last;
-    }
-
-    @Override
-    public void resultArrived(QueryResultBatch result) {
-      logger.debug("Result arrvied.");
-      lock.readLock().lock();
-      try {
-        if (output == null) {
-          this.results.add(result);
-        } else {
-          output.resultArrived(result);
-        }
-
-      } finally {
-        lock.readLock().unlock();
-      }
-
-    }
-
-    @Override
-    public void submissionFailed(RpcException ex) {
-      throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
-    }
-
+  public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
+    send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
   }
 
-  private class SubmissionListener extends RpcOutcomeListener<QueryId> {
-    private UserResultsListener listener;
-
-    public SubmissionListener(UserResultsListener listener) {
-      super();
-      this.listener = listener;
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      listener.submissionFailed(ex);
-    }
-
-    @Override
-    public void success(QueryId queryId) {
-      logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
-      UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
-
-      // we need to deal with the situation where we already received results by the time we got the query id back. In
-      // that case, we'll need to transfer the buffering listener over, grabbing a lock against reception of additional
-      // results during the transition
-      if (oldListener != null) {
-        logger.debug("Unable to place user results listener, buffering listener was already in place.");
-        if (oldListener instanceof BufferingListener) {
-          resultsListener.remove(oldListener);
-          boolean all = ((BufferingListener) oldListener).transferTo(this.listener);
-          // simply remove the buffering listener if we already have the last response.
-          if (all) {
-            resultsListener.remove(oldListener);
-          } else {
-            boolean replaced = resultsListener.replace(queryId, oldListener, listener);
-            if (!replaced) throw new IllegalStateException();
-          }
-        } else {
-          throw new IllegalStateException("Trying to replace a non-buffering User Results listener.");
-        }
-      }
-
-    }
-
+  public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
+    UserToBitHandshake hs = UserToBitHandshake.newBuilder().setRpcVersion(UserRpcConfig.RPC_VERSION).setSupportListening(true).build();
+    this.connectAsClient(handler, hs, endpoint.getAddress(), endpoint.getUserPort());
   }
 
   @Override
@@ -165,29 +72,7 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
   protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
     switch (rpcType) {
     case RpcType.QUERY_RESULT_VALUE:
-      final QueryResult result = get(pBody, QueryResult.PARSER);
-      final QueryResultBatch batch = new QueryResultBatch(result, dBody);
-      UserResultsListener l = resultsListener.get(result.getQueryId());
-//      logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
-      if (l != null) {
-//        logger.debug("Results listener available, using existing.");
-        l.resultArrived(batch);
-        if (result.getIsLastChunk()) {
-          resultsListener.remove(result.getQueryId(), l);
-          l.set();
-        }
-      } else {
-        logger.debug("Results listener not available, creating a buffering listener.");
-        // manage race condition where we start getting results before we receive the queryid back.
-        BufferingListener bl = new BufferingListener();
-        l = resultsListener.putIfAbsent(result.getQueryId(), bl);
-        if (l != null) {
-          l.resultArrived(batch);
-        } else {
-          bl.resultArrived(batch);
-        }
-      }
-
+      queryResultHandler.batchArrived(pBody, dBody);
       return new Response(RpcType.ACK, Ack.getDefaultInstance());
     default:
       throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
@@ -196,18 +81,16 @@ public class UserClient extends BasicClientWithConnection<RpcType> {
   }
 
   @Override
-  protected ClientHandshakeHandler<BitToUserHandshake> getHandshakeHandler() {
-    return new ClientHandshakeHandler<BitToUserHandshake>(RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER) {
+  protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+    logger.debug("Handling handshake from bit to user. {}", inbound);
+    if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
+      throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(),
+          UserRpcConfig.RPC_VERSION));
 
-      @Override
-      protected void validateHandshake(BitToUserHandshake inbound) throws Exception {
-        logger.debug("Handling handshake from bit to user. {}", inbound);
-        if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION)
-          throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.",
-              inbound.getRpcVersion(), UserRpcConfig.RPC_VERSION));
-      }
+  }
 
-    };
+  @Override
+  protected void finalizeConnection(BitToUserHandshake handshake, BasicClientWithConnection.ServerConnection connection) {
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3ce14f0..b1dbfe8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,17 +24,8 @@ import org.apache.drill.exec.rpc.RpcException;
 
 import com.google.common.util.concurrent.SettableFuture;
 
-public abstract class UserResultsListener {
-  SettableFuture<Void> future = SettableFuture.create();
+public interface UserResultsListener {
   
-  final void set(){
-    future.set(null);
-  }
-  
-  Future<Void> getFuture(){
-    return future;
-  }
-
   public abstract void submissionFailed(RpcException ex); 
   public abstract void resultArrived(QueryResultBatch result);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 406afc4..908af61 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -26,16 +26,15 @@ import io.netty.channel.EventLoopGroup;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
 import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.work.user.UserWorker;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -100,8 +99,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       super(channel);
     }
 
-    public DrillRpcFuture<Ack> sendResult(QueryWritableBatch result){
-      return send(this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+    public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+      logger.debug("Sending result to client with {}", result);
+      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
     }
 
   }
@@ -112,7 +112,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
   }
   
   @Override
-  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler() {
+  protected ServerHandshakeHandler<UserToBitHandshake> getHandshakeHandler(UserClientConnection connection) {
     return new ServerHandshakeHandler<UserToBitHandshake>(RpcType.HANDSHAKE, UserToBitHandshake.PARSER){
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 3c4d9af..ed13748 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{
   public BootStrapContext(DrillConfig config) {
     super();
     this.config = config;
-    this.loop = new NioEventLoopGroup(4, new NamedThreadFactory("BitServer-"));
+    this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
     this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
     this.allocator = BufferAllocator.getAllocator(config);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 0337a68..199768f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.LocalCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.LocalClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 
 public class RemoteServiceSet implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -37,6 +38,7 @@ public class RemoteServiceSet implements Closeable{
     this.coordinator = coordinator;
   }
 
+
   public DistributedCache getCache() {
     return cache;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d6d3b9c..b07f274 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -53,12 +53,12 @@ public class ServiceEngine implements Closeable{
   
   public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
     int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT));
-    int bitPort = bitCom.start();
-    return DrillbitEndpoint.newBuilder()
+    DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
         .setAddress(InetAddress.getLocalHost().getHostAddress())
-        .setBitPort(bitPort)
         .setUserPort(userPort)
         .build();
+
+    return bitCom.start(partialEndpoint);
   }
 
   public BitCom getBitCom(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
index f6a9786..9a72845 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/EndpointListener.java
@@ -18,10 +18,9 @@
 package org.apache.drill.exec.work;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 
-public abstract class EndpointListener<RET, V> extends RpcOutcomeListener<RET>{
+public abstract class EndpointListener<RET, V> extends BaseRpcOutcomeListener<RET>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointListener.class);
 
   protected final DrillbitEndpoint endpoint;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
index 2900d99..554b398 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -65,6 +65,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
   
   @Override
   public void run() {
+    logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
     if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
       internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
       return;
@@ -76,7 +77,12 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
     try{
       while(state.get() == FragmentState.RUNNING_VALUE){
         if(!root.next()){
-          updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+          if(context.isFailed()){
+            updateState(FragmentState.RUNNING, FragmentState.FAILED, false);  
+          }else{
+            updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+          }
+          
         }
       }
       
@@ -90,7 +96,7 @@ public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider
     }finally{
       t.stop();
     }
-    
+    logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
   }
   
   private void internalFail(Throwable excep){


[3/3] git commit: Clean up threading of client/server. Utilize command pattern for BitCom stuff to abstract away connection failures. Works on one bit single exchange remote query now. Next up, two bit single exchange query.

Posted by ja...@apache.org.
Clean up threading of client/server.  Utilize command pattern for BitCom stuff to abstract away connection failures.  Works on one bit single exchange remote query now.  Next up, two bit single exchange query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b8db98ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b8db98ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b8db98ad

Branch: refs/heads/execwork
Commit: b8db98ad7c159db3cf41a3866ff53013f87964b4
Parents: e57a8d6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue May 21 18:38:56 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 21 18:38:56 2013 -0700

----------------------------------------------------------------------
 .../drill/common/graph/AdjacencyListBuilder.java   |    2 +-
 .../org/apache/drill/exec/cache/LocalCache.java    |    2 +
 .../org/apache/drill/exec/client/DrillClient.java  |   77 ++++--
 .../drill/exec/coord/LocalClusterCoordinator.java  |    7 +-
 .../org/apache/drill/exec/ops/FragmentContext.java |   19 ++-
 .../org/apache/drill/exec/ops/QueryContext.java    |    4 +
 .../exec/physical/config/MockRecordReader.java     |    1 -
 .../drill/exec/physical/config/RandomReceiver.java |    5 -
 .../apache/drill/exec/physical/config/Screen.java  |    2 +-
 .../drill/exec/physical/impl/ScreenCreator.java    |   79 +++++-
 .../exec/physical/impl/SingleSenderCreator.java    |   41 +++-
 .../drill/exec/physical/impl/WireRecordBatch.java  |    8 +-
 .../impl/materialize/QueryWritableBatch.java       |    8 +
 .../impl/materialize/VectorRecordMaterializer.java |   11 +-
 .../drill/exec/planner/fragment/Materializer.java  |    8 +-
 .../exec/planner/fragment/SimpleParallelizer.java  |    4 +-
 .../exec/planner/fragment/StatsCollector.java      |    2 +-
 .../apache/drill/exec/record/RawFragmentBatch.java |    5 +
 .../drill/exec/rpc/AbstractHandshakeHandler.java   |    5 +-
 .../drill/exec/rpc/BaseRpcOutcomeListener.java     |   32 +++
 .../org/apache/drill/exec/rpc/BasicClient.java     |  176 ++++++++------
 .../drill/exec/rpc/BasicClientWithConnection.java  |    9 +-
 .../org/apache/drill/exec/rpc/BasicServer.java     |    7 +-
 .../rpc/ChannelListenerWithCoordinationId.java     |   25 ++
 .../apache/drill/exec/rpc/CoordinationQueue.java   |   96 +++++++--
 .../org/apache/drill/exec/rpc/DrillRpcFuture.java  |    2 -
 .../apache/drill/exec/rpc/DrillRpcFutureImpl.java  |   70 +-----
 .../java/org/apache/drill/exec/rpc/RpcBus.java     |   88 ++++---
 .../apache/drill/exec/rpc/RpcCheckedFuture.java    |   33 +++
 .../drill/exec/rpc/RpcConnectionHandler.java       |   28 +++
 .../org/apache/drill/exec/rpc/RpcException.java    |   13 +
 .../java/org/apache/drill/exec/rpc/RpcOutcome.java |   26 ++
 .../apache/drill/exec/rpc/RpcOutcomeListener.java  |    7 +-
 .../exec/rpc/ZeroCopyProtobufLengthDecoder.java    |    2 +-
 .../org/apache/drill/exec/rpc/bit/BitClient.java   |   52 ++---
 .../java/org/apache/drill/exec/rpc/bit/BitCom.java |    8 +-
 .../org/apache/drill/exec/rpc/bit/BitComImpl.java  |  129 ++---------
 .../org/apache/drill/exec/rpc/bit/BitCommand.java  |   28 +++
 .../apache/drill/exec/rpc/bit/BitConnection.java   |   79 +-----
 .../drill/exec/rpc/bit/BitConnectionManager.java   |  175 +++++++++++---
 .../org/apache/drill/exec/rpc/bit/BitServer.java   |   60 ++++-
 .../org/apache/drill/exec/rpc/bit/BitTunnel.java   |  187 ++++-----------
 .../exec/rpc/bit/ConnectionManagerRegistry.java    |   73 ++++++
 .../drill/exec/rpc/bit/FutureBitCommand.java       |   78 ++++++
 .../apache/drill/exec/rpc/bit/ListenerPool.java    |   15 +-
 .../drill/exec/rpc/bit/ListeningBitCommand.java    |   73 ++++++
 .../drill/exec/rpc/user/QueryResultBatch.java      |    7 +
 .../drill/exec/rpc/user/QueryResultHandler.java    |  153 ++++++++++++
 .../org/apache/drill/exec/rpc/user/UserClient.java |  153 ++-----------
 .../drill/exec/rpc/user/UserResultsListener.java   |   11 +-
 .../org/apache/drill/exec/rpc/user/UserServer.java |   10 +-
 .../apache/drill/exec/server/BootStrapContext.java |    2 +-
 .../apache/drill/exec/server/RemoteServiceSet.java |    2 +
 .../apache/drill/exec/service/ServiceEngine.java   |    6 +-
 .../apache/drill/exec/work/EndpointListener.java   |    5 +-
 .../org/apache/drill/exec/work/FragmentRunner.java |   10 +-
 .../org/apache/drill/exec/work/WorkManager.java    |    7 +-
 .../exec/work/batch/AbstractFragmentCollector.java |    7 +-
 .../drill/exec/work/batch/BatchCollector.java      |    3 +-
 .../drill/exec/work/batch/BitComHandlerImpl.java   |    9 +-
 .../drill/exec/work/batch/IncomingBuffers.java     |   21 +-
 .../drill/exec/work/batch/MergingCollector.java    |    5 +-
 .../exec/work/batch/PartitionedCollector.java      |    1 +
 .../exec/work/batch/UnlmitedRawBatchBuffer.java    |    2 +-
 .../apache/drill/exec/work/foreman/Foreman.java    |   22 +-
 .../exec/work/foreman/RunningFragmentManager.java  |    8 +-
 .../work/fragment/IncomingFragmentHandler.java     |    2 +-
 .../exec/work/fragment/LocalFragmentHandler.java   |    4 +-
 .../exec/work/fragment/RemoteFragmentHandler.java  |    4 +-
 .../exec/physical/impl/DistributedFragmentRun.java |   17 +-
 .../org/apache/drill/exec/pop/CheckFragmenter.java |   21 +--
 .../org/apache/drill/exec/pop/FragmentChecker.java |   41 +++-
 .../org/apache/drill/exec/server/TestBitRpc.java   |   26 ++-
 .../test/resources/physical_single_exchange.json   |    1 -
 74 files changed, 1498 insertions(+), 923 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
index 1668477..4a385ce 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/graph/AdjacencyListBuilder.java
@@ -57,7 +57,7 @@ import java.util.Map;
   }
 
   public AdjacencyList<V> getAdjacencyList() {
-    logger.debug("Values; {}", ops.values().toArray());
+//    logger.debug("Values; {}", ops.values().toArray());
     AdjacencyList<V> a = new AdjacencyList<V>();
 
     for (AdjacencyList<V>.Node from : ops.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index ddb2a02..b656f2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -43,11 +43,13 @@ public class LocalCache implements DistributedCache {
 
   @Override
   public PlanFragment getFragment(FragmentHandle handle) {
+    logger.debug("looking for fragment with handle: {}", handle);
     return handles.get(handle);
   }
 
   @Override
   public void storeFragment(PlanFragment fragment) {
+    logger.debug("Storing fragment: {}", fragment);
     handles.put(fragment.getHandle(), fragment);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bb7f77e..c35e834 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -30,22 +30,23 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Vector;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.proto.UserProtos.RpcType;
-import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.rpc.user.UserRpcConfig;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
@@ -75,9 +76,6 @@ public class DrillClient implements Closeable{
   }
   
   
-  
-
-
   /**
    * Connects the client to a Drillbit server
    *
@@ -97,7 +95,9 @@ public class DrillClient implements Closeable{
     this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
     try {
       logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
-      this.client.connect(endpoint);
+      FutureHandler f = new FutureHandler();
+      this.client.connect(f, endpoint);
+      f.checkedGet();
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
@@ -120,34 +120,63 @@ public class DrillClient implements Closeable{
    * @throws RpcException
    */
   public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
-    try {
-      ListHoldingResultsListener listener = new ListHoldingResultsListener();
-      Future<Void> f = client.submitQuery(newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build(), listener);
-      f.get();
-      if(listener.ex != null){
-        throw listener.ex;
-      }else{
-        return listener.results;
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      throw new RpcException(e);
-    }
+    ListHoldingResultsListener listener = new ListHoldingResultsListener();
+    client.submitQuery(listener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
+    return listener.getResults();
+
   }
   
-  private class ListHoldingResultsListener extends UserResultsListener{
-    private RpcException ex;
+  private class ListHoldingResultsListener implements UserResultsListener {
     private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+    private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
     
     @Override
     public void submissionFailed(RpcException ex) {
       logger.debug("Submission failed.", ex);
-      this.ex = ex;
+      future.setException(ex);
     }
 
     @Override
     public void resultArrived(QueryResultBatch result) {
       logger.debug("Result arrived.  Is Last Chunk: {}.  Full Result: {}", result.getHeader().getIsLastChunk(), result);
       results.add(result);
+      if(result.getHeader().getIsLastChunk()){
+        future.set(results);
+      }
+    }
+  
+    public List<QueryResultBatch> getResults() throws RpcException{
+      try{
+        return future.get();
+      }catch(Throwable t){
+        throw RpcException.mapException(t);
+      }
+    }
+  }
+  
+  private class FutureHandler extends AbstractCheckedFuture<Void, RpcException> implements RpcConnectionHandler<ServerConnection>, DrillRpcFuture<Void>{
+
+    protected FutureHandler() {
+      super( SettableFuture.<Void>create());
+    }
+
+    @Override
+    public void connectionSucceeded(ServerConnection connection) {
+      getInner().set(null);
+    }
+
+    @Override
+    public void connectionFailed(FailureType type, Throwable t) {
+      getInner().setException(new RpcException(String.format("Failure connecting to server. Failure of type %s.", type.name()), t));
+    }
+
+    private SettableFuture<Void> getInner(){
+      return (SettableFuture<Void>) delegate();
+    }
+    
+    @Override
+    protected RpcException mapException(Exception e) {
+      return RpcException.mapException(e);
     }
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
index 43a5430..f7b3549 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/LocalClusterCoordinator.java
@@ -29,17 +29,16 @@ import com.google.common.collect.Maps;
 public class LocalClusterCoordinator extends ClusterCoordinator{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
 
-  private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints;
+  private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
   
   @Override
   public void close() throws IOException {
-    endpoints = null;
+    endpoints.clear();
   }
 
   @Override
   public void start(long millis) throws Exception {
     logger.debug("Local Cluster Coordinator started.");
-    endpoints = Maps.newConcurrentMap();
   }
 
   @Override
@@ -52,6 +51,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator{
 
   @Override
   public void unregister(RegistrationHandle handle) {
+    if(handle == null) return;
+    
     endpoints.remove(handle);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e64453c..33707a0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -52,7 +52,9 @@ public class FragmentContext {
   private final FragmentHandle handle;
   private final UserClientConnection connection;
   private final IncomingBuffers buffers;
-
+  private volatile Throwable failureCause;
+  private volatile boolean failed = false;
+  
   public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers) {
     this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
     this.batchesCompleted = new SingleThreadNestedCounter(dbContext, METRIC_BATCHES_COMPLETED);
@@ -65,9 +67,10 @@ public class FragmentContext {
   }
 
   public void fail(Throwable cause) {
-
+    logger.debug("Fragment Context received failure. {}", cause);
+    failed = true;
+    failureCause = cause;
   }
-
   
   public DrillbitContext getDrillbitContext(){
     return context;
@@ -107,4 +110,14 @@ public class FragmentContext {
   public IncomingBuffers getBuffers(){
     return buffers;
   }
+
+  public Throwable getFailureCause() {
+    return failureCause;
+  }
+  
+  public boolean isFailed(){
+    return failed;
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fd24deb..1c251b8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.bit.BitCom;
 import org.apache.drill.exec.server.DrillbitContext;
 
 public class QueryContext {
@@ -57,4 +58,7 @@ public class QueryContext {
     return drillbitContext.getPlanReader();
   }
   
+  public BitCom getBitCom(){
+    return drillbitContext.getBitCom();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index eaaeaa3..6a1eba4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -76,7 +76,6 @@ public class MockRecordReader implements RecordReader {
       int batchRecordCount = 250000 / estimateRowSize;
 
       for (int i = 0; i < config.getTypes().length; i++) {
-        logger.debug("Adding field {} of type {}", i, config.getTypes()[i]);
         valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
         output.addField(i, valueVectors[i]);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
index ed41586..6772fb0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -72,11 +72,6 @@ public class RandomReceiver extends AbstractReceiver{
     return new Size(1,1);
   }
 
-  @Override
-  public int getOppositeMajorFragmentId() {
-    return 0;
-  }
-
   
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 86a201d..688c6b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -64,7 +64,7 @@ public class Screen extends AbstractStore implements Root{
     // didn't get screwed up.
     if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
     DrillbitEndpoint endpoint = endpoints.iterator().next();
-    logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
+//    logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
     if (!endpoint.equals(this.endpoint)) {
       throw new PhysicalOperatorSetupException(String.format(
           "A Screen operator can only be assigned to its home node.  Expected endpoint %s, Actual endpoint: %s",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index c0711db..c20538d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,21 +17,32 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
 import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
 
 import com.google.common.base.Preconditions;
 
 public class ScreenCreator implements RootCreator<Screen>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
-
+  
+  
   @Override
   public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
     Preconditions.checkArgument(children.size() == 1);
@@ -40,7 +51,9 @@ public class ScreenCreator implements RootCreator<Screen>{
   
   
   private static class ScreenRoot implements RootExec{
-
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
+    volatile boolean ok = true;
+    
     final RecordBatch incoming;
     final FragmentContext context;
     final UserClientConnection connection;
@@ -56,25 +69,53 @@ public class ScreenCreator implements RootCreator<Screen>{
     
     @Override
     public boolean next() {
+      if(!ok){
+        stop();
+        return false;
+      }
+      
       IterOutcome outcome = incoming.next();
-      boolean isLast = false;
+      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
-      case NONE:
-      case STOP:
-        connection.sendResult(materializer.convertNext(true));
-        context.batchesCompleted.inc(1);
-        context.recordsCompleted.inc(incoming.getRecordCount());
+      case STOP: {
+          QueryResult header1 = QueryResult.newBuilder() //
+              .setQueryId(context.getHandle().getQueryId()) //
+              .setRowCount(0) //
+              .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger))
+              .setDef(RecordBatchDef.getDefaultInstance()) //
+              .setIsLastChunk(true) //
+              .build();
+          QueryWritableBatch batch1 = new QueryWritableBatch(header1);
+
+          connection.sendResult(listener, batch1);
+          return false;
+      }
+      case NONE: {
+        if(materializer == null){
+          // receive no results.
+          context.batchesCompleted.inc(1);
+          context.recordsCompleted.inc(incoming.getRecordCount());
+          QueryResult header2 = QueryResult.newBuilder() //
+              .setQueryId(context.getHandle().getQueryId()) //
+              .setRowCount(0) //
+              .setDef(RecordBatchDef.getDefaultInstance()) //
+              .setIsLastChunk(true) //
+              .build();
+          QueryWritableBatch batch2 = new QueryWritableBatch(header2);
+          connection.sendResult(listener, batch2);
+        }else{
+          connection.sendResult(listener, materializer.convertNext(true));
+        }
         return false;
-        
+      }
       case OK_NEW_SCHEMA:
         materializer = new VectorRecordMaterializer(context, incoming);
         // fall through.
-        // fall through
       case OK:
-        connection.sendResult(materializer.convertNext(false));
         context.batchesCompleted.inc(1);
         context.recordsCompleted.inc(incoming.getRecordCount());
-        return !isLast;
+        connection.sendResult(listener, materializer.convertNext(false));
+        return true;
       default:
         throw new UnsupportedOperationException();
       }
@@ -85,6 +126,20 @@ public class ScreenCreator implements RootCreator<Screen>{
       incoming.kill();
     }
 
+    private SendListener listener = new SendListener();
+    
+    private class SendListener extends BaseRpcOutcomeListener<Ack>{
+
+      @Override
+      public void failed(RpcException ex) {
+        logger.error("Failure while sending data to user.", ex);
+        ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger);
+        ok = false;
+      }
+      
+    }
     
   }
+  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 60c2d78..b7d4c7e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -23,9 +23,12 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.bit.BitTunnel;
 
 public class SingleSenderCreator implements RootCreator<SingleSender>{
@@ -45,9 +48,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private FragmentHandle handle;
     private int recMajor;
     private FragmentContext context;
+    private volatile boolean ok = true;
     
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
-      logger.debug("Creating single sender root exec base on config: {}", config);
       this.incoming = batch;
       this.handle = context.getHandle();
       this.recMajor = config.getOppositeMajorFragmentId();
@@ -57,20 +60,24 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     
     @Override
     public boolean next() {
+      if(!ok){
+        incoming.kill();
+        
+        return false;
+      }
       IterOutcome out = incoming.next();
       logger.debug("Outcome of sender next {}", out);
       switch(out){
       case STOP:
       case NONE:
-        FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
-        tunnel.sendRecordBatch(context, b2);
+        FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
         return false;
-        
 
       case OK:
       case OK_NEW_SCHEMA:
-        FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
-        tunnel.sendRecordBatch(context, batch);
+        FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
         return true;
 
       case NOT_YET:
@@ -81,9 +88,31 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
     @Override
     public void stop() {
+      ok = false;
     }
     
     
+    private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
+
+      @Override
+      public void failed(RpcException ex) {
+        context.fail(ex);
+        stop();
+      }
+
+      @Override
+      public void success(Ack value) {
+        if(value.getOk()) return;
+        
+        logger.error("Downstream fragment was not accepted.  Stopping future sends.");
+        // if we didn't get ack ok, we'll need to kill the query.
+        context.fail(new RpcException("A downstream fragment batch wasn't accepted.  This fragment thus fails."));
+        stop();
+      }
+      
+    }
     
   }
+  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fc7f833..b41b0cd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -38,6 +38,7 @@ public class WireRecordBatch implements RecordBatch{
   private RecordBatchLoader batchLoader;
   private RawFragmentBatchProvider fragProvider;
   private FragmentContext context;
+  private BatchSchema schema;
 
   
   public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
@@ -53,7 +54,7 @@ public class WireRecordBatch implements RecordBatch{
 
   @Override
   public BatchSchema getSchema() {
-    return null;
+    return schema;
   }
 
   @Override
@@ -73,13 +74,16 @@ public class WireRecordBatch implements RecordBatch{
 
   @Override
   public IterOutcome next() {
-    RawFragmentBatch batch = this.fragProvider.getNext();
+    RawFragmentBatch batch = fragProvider.getNext();
     try{
       if(batch == null) return IterOutcome.NONE;
 
+      logger.debug("Next received batch {}", batch);
+
       RecordBatchDef rbd = batch.getHeader().getDef();
       boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
       if(schemaChanged){
+        this.schema = batchLoader.getSchema();
         return IterOutcome.OK_NEW_SCHEMA;
       }else{
         return IterOutcome.OK;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index 187e6e9..e8ed48a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl.materialize;
 
+import java.util.Arrays;
+
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
@@ -42,5 +44,11 @@ public class QueryWritableBatch {
   public QueryResult getHeader() {
     return header;
   }
+
+  @Override
+  public String toString() {
+    return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index e2d2eb9..7929296 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.materialize;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
@@ -33,10 +34,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{
   public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
     this.queryId = context.getHandle().getQueryId();
     this.batch = batch;
-
-    for (MaterializedField f : batch.getSchema()) {
-      logger.debug("New Field: {}", f);
-    }
+    BatchSchema schema = batch.getSchema();
+    assert schema != null : "Schema must be defined.";
+    
+//    for (MaterializedField f : batch.getSchema()) {
+//      logger.debug("New Field: {}", f);
+//    }
   }
 
   public QueryWritableBatch convertNext(boolean isLast) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 9fee586..da71271 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -41,13 +41,13 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
       // this is a sending exchange.
       PhysicalOperator child = exchange.getChild().accept(this, iNode);
       PhysicalOperator materializedSender = exchange.getSender(iNode.getMinorFragmentId(), child);
-      logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
+//      logger.debug("Visit sending exchange, materialized {} with child {}.", materializedSender, child);
       return materializedSender;
       
     }else{
       // receiving exchange.
       PhysicalOperator materializedReceiver = exchange.getReceiver(iNode.getMinorFragmentId());
-      logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
+//      logger.debug("Visit receiving exchange, materialized receiver: {}.", materializedReceiver);
       return materializedReceiver;
     }
   }
@@ -63,7 +63,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
     
     try {
       PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
-      logger.debug("New materialized store node {} with child {}", o, child);
+//      logger.debug("New materialized store node {} with child {}", o, child);
       return o;
     } catch (PhysicalOperatorSetupException e) {
       throw new FragmentSetupException("Failure while generating a specific Store materialization.");
@@ -72,7 +72,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
 
   @Override
   public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException {
-    logger.debug("Visiting catch all: {}", op);
+//    logger.debug("Visiting catch all: {}", op);
     List<PhysicalOperator> children = Lists.newArrayList();
     for(PhysicalOperator child : op){
       children.add(child.accept(this, iNode));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index fc03a23..8adb447 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -145,7 +145,7 @@ public class SimpleParallelizer {
       // figure out width.
       int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
       float diskCost = stats.getDiskCost();
-      logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
+//      logger.debug("Frag max width: {} and diskCost: {}", stats.getMaxWidth(), diskCost);
 
       // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
       // of tasks or the maximum width of the fragment.
@@ -154,7 +154,7 @@ public class SimpleParallelizer {
       }
 
       if (width < 1) width = 1;
-      logger.debug("Setting width {} on fragment {}", width, wrapper);
+//      logger.debug("Setting width {} on fragment {}", width, wrapper);
       wrapper.setWidth(width);
       // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
       wrapper.assignEndpoints(allNodes);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index d53a78c..af8ec04 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -41,7 +41,7 @@ public class StatsCollector {
 
     Wrapper wrapper = planningSet.get(n);
     n.getRoot().accept(opStatCollector, wrapper);
-    logger.debug("Set stats to {}", wrapper.getStats());
+//    logger.debug("Set stats to {}", wrapper.getStats());
     // receivers...
     for (ExchangeFragmentPair child : n) {
       // get the fragment node that feeds this node.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index c244cea..4f87224 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -41,4 +41,9 @@ public class RawFragmentBatch {
     return body;
   }
 
+  @Override
+  public String toString() {
+    return "RawFragmentBatch [header=" + header + ", body=" + body + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index 859d385..ea591da 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -25,8 +25,7 @@ import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
-public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
-    ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
 
   protected final EnumLite handshakeType;
@@ -41,7 +40,7 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
 
   @Override
   public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
-    coordinationId = inbound.coordinationId;
+    this.coordinationId = inbound.coordinationId;
     ctx.channel().pipeline().remove(this);
     if (inbound.rpcType != handshakeType.getNumber())
       throw new RpcException(String.format("Handshake failure.  Expected %s[%d] but received number [%d]",

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
new file mode 100644
index 0000000..1dab1c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
+
+  @Override
+  public void failed(RpcException ex) {
+  }
+
+  @Override
+  public void success(T value) {
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0ff2b9d..0afc5d0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -29,22 +29,30 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
+
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
-public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection> extends RpcBus<T, R> {
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
+    extends RpcBus<T, R> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
-  private Bootstrap b;
+  private final Bootstrap b;
   private volatile boolean connect = false;
   protected R connection;
-  private EventLoopGroup eventLoop;
+  private final T handshakeType;
+  private final Class<HANDSHAKE_RESPONSE> responseClass;
+  private final Parser<HANDSHAKE_RESPONSE> handshakeParser;
 
-  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
     super(rpcMapping);
-    this.eventLoop = eventLoopGroup;
+    this.responseClass = responseClass;
+    this.handshakeType = handshakeType;
+    this.handshakeParser = handshakeParser;
     
     b = new Bootstrap() //
         .group(eventLoopGroup) //
@@ -59,12 +67,12 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             logger.debug("initializing client connection.");
             connection = initRemoteConnection(ch);
             ch.closeFuture().addListener(getCloseHandler(connection));
-
+            
             ch.pipeline().addLast( //
                 new ZeroCopyProtobufLengthDecoder(), //
                 new RpcDecoder(rpcConfig.getName()), //
                 new RpcEncoder(rpcConfig.getName()), //
-                getHandshakeHandler(), //
+                new ClientHandshakeHandler(), //
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );
@@ -75,26 +83,9 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     ;
   }
 
-  protected abstract ClientHandshakeHandler<?> getHandshakeHandler();
-
-  protected abstract class ClientHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
-    private Class<T> responseType;
-
-    public ClientHandshakeHandler(EnumLite handshakeType, Class<T> responseType, Parser<T> parser) {
-      super(handshakeType, parser);
-      this.responseType = responseType;
-    }
-
-    @Override
-    protected final void consumeHandshake(Channel c, T msg) throws Exception {
-      validateHandshake(msg);
-      queue.getFuture(handshakeType.getNumber(), coordinationId, responseType).setValue(msg);
-    }
-
-    protected abstract void validateHandshake(T msg) throws Exception;
-
-  }
-
+  protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException;
+  protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection);
+  
   protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
     return new ChannelClosedHandler();
   }
@@ -105,6 +96,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
         "This shouldn't be used in client mode as a client only has a single connection.");
   }
 
+  protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener,
+      T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+    super.send(listener, connection, rpcType, protobufBody, clazz, dataBodies);
+  }
+
   protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
     return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
@@ -115,65 +111,91 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     return true;
   }
 
-  /**
-   * TODO: This is a horrible hack to manage deadlock caused by creation of BitClient within BitCom.  Should be cleaned up.
-   */
-  private class HandshakeThread<SEND extends MessageLite, RECEIVE extends MessageLite> extends Thread {
-    final SettableFuture<RECEIVE> future;
-    T handshakeType;
-    SEND handshakeValue;
-    String host;
-    int port;
-    Class<RECEIVE> responseClass;
-
-    public HandshakeThread(T handshakeType, SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) {
-      super();
-      assert host != null && !host.isEmpty();
-      assert port > 0;
-      logger.debug("Creating new handshake thread to connec to {}:{}", host, port);
-      this.setName(String.format("handshake thread for %s", handshakeType.getClass().getCanonicalName()));
-      future = SettableFuture.create();
-      this.handshakeType = handshakeType;
+  protected void connectAsClient(RpcConnectionHandler<R> connectionListener, HANDSHAKE_SEND handshakeValue, String host, int port){
+    ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
+    b.connect(host, port).addListener(cml.connectionHandler);
+  }
+
+  private class ConnectionMultiListener {
+    private final RpcConnectionHandler<R> l;
+    private final HANDSHAKE_SEND handshakeValue;
+
+    public ConnectionMultiListener(RpcConnectionHandler<R> l, HANDSHAKE_SEND handshakeValue) {
+      assert l != null;
+      assert handshakeValue != null;
+          
+      this.l = l;
       this.handshakeValue = handshakeValue;
-      this.host = host;
-      this.port = port;
-      this.responseClass = responseClass;
     }
 
-    @Override
-    public void run() {
-      try {
-        logger.debug("Starting to get client connection on host {}, port {}.", host, port);
-        
-        ChannelFuture f = b.connect(host, port);
-        f.sync();
-        if (connection == null) throw new RpcException("Failure while attempting to connect to server.");
-        connect = !connect;
-        logger.debug("Client connected, sending handshake.");
-        DrillRpcFuture<RECEIVE> fut = send(handshakeType, handshakeValue, responseClass);
-        future.set(fut.checkedGet());
-        logger.debug("Got bit client connection.");
-      } catch (Exception e) {
-        logger.debug("Failed to get client connection.", e);
-        future.setException(e);
+    public final ConnectionHandler connectionHandler = new ConnectionHandler();
+    public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
+
+    /**
+     * Manages connection establishment outcomes.
+     */
+    private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
+
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+//        logger.debug("Connection operation finished.  Success: {}", future.isSuccess());
+        try {
+          future.get();
+          if (future.isSuccess()) {
+            send(handshakeSendHandler, handshakeType, handshakeValue, responseClass);
+          } else {
+            l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
+          }
+//          logger.debug("Handshake queued for send.");
+        } catch (Exception ex) {
+          l.connectionFailed(FailureType.CONNECTION, ex);
+        }
       }
     }
 
+    /**
+     * manages handshake outcomes.
+     */
+    private class HandshakeSendHandler implements RpcOutcomeListener<HANDSHAKE_RESPONSE> {
+
+      @Override
+      public void failed(RpcException ex) {
+        logger.debug("Failure while initiating handshake", ex);
+        l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
+      }
+
+      @Override
+      public void success(HANDSHAKE_RESPONSE value) {
+//        logger.debug("Handshake received. {}", value);
+        try {
+          BasicClient.this.validateHandshake(value);
+          BasicClient.this.finalizeConnection(value, connection);
+          BasicClient.this.connect = true;
+          l.connectionSucceeded(connection);
+//          logger.debug("Handshake completed succesfully.");
+        } catch (RpcException ex) {
+          l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
+        }
+      }
+
+    }
+
   }
 
-  protected <SEND extends MessageLite, RECEIVE extends MessageLite> RECEIVE connectAsClient(T handshakeType,
-      SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) throws InterruptedException,
-      RpcException {
-    
-    
-    HandshakeThread<SEND, RECEIVE> ht = new HandshakeThread<SEND, RECEIVE>(handshakeType, handshakeValue, host, port, responseClass);
-    ht.start();
-    try{
-      return ht.future.get();  
-    }catch(Exception e){
-      throw new RpcException(e);
+  private class ClientHandshakeHandler extends AbstractHandshakeHandler<HANDSHAKE_RESPONSE> {
+
+    public ClientHandshakeHandler() {
+      super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
     }
-    
+
+    @Override
+    protected final void consumeHandshake(Channel c, HANDSHAKE_RESPONSE msg) throws Exception {
+      // remove the handshake information from the queue so it doesn't sit there forever.
+      RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(), coordinationId,
+          responseClass);
+      response.set(msg);
+    }
+
   }
 
   public void close() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index 0e62f14..2028db6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -26,13 +26,16 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
 
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 import com.google.protobuf.Internal.EnumLite;
 
-public abstract class BasicClientWithConnection<T extends EnumLite> extends BasicClient<T, ServerConnection>{
+public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> extends BasicClient<T, ServerConnection, HANDSHAKE_SEND, HANDSHAKE_RESPONSE>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
 
-  public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
-    super(rpcMapping, alloc, eventLoopGroup);
+  public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
+      Class<HANDSHAKE_RESPONSE> responseClass, Parser<HANDSHAKE_RESPONSE> handshakeParser) {
+    super(rpcMapping, alloc, eventLoopGroup, handshakeType, responseClass, handshakeParser);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index 52bb0a2..af5d9c9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -72,7 +72,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
                 new ZeroCopyProtobufLengthDecoder(), //
                 new RpcDecoder(rpcConfig.getName()), //
                 new RpcEncoder(rpcConfig.getName()), //
-                getHandshakeHandler(),
+                getHandshakeHandler(connection),
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );            
@@ -88,7 +88,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
   }
 
   
-  protected abstract ServerHandshakeHandler<?> getHandshakeHandler();
+  protected abstract ServerHandshakeHandler<?> getHandshakeHandler(C connection);
 
   protected static abstract class ServerHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
 
@@ -104,9 +104,6 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
     
     public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;
     
-
-      
-    
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
new file mode 100644
index 0000000..27e9dee
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ChannelListenerWithCoordinationId.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
+public interface ChannelListenerWithCoordinationId extends GenericFutureListener<ChannelFuture>{
+  public int getCoordinationId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 70142bb..9edbe11 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -17,8 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 
@@ -29,31 +33,93 @@ public class CoordinationQueue {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
 
   private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
-  private final Map<Integer, DrillRpcFutureImpl<?>> map;
+  private final Map<Integer, RpcOutcome<?>> map;
 
   public CoordinationQueue(int segmentSize, int segmentCount) {
-    map = new ConcurrentHashMap<Integer, DrillRpcFutureImpl<?>>(segmentSize, 0.75f, segmentCount);
+    map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount);
   }
 
-  void channelClosed(Exception ex) {
-    for (DrillRpcFutureImpl<?> f : map.values()) {
-      f.setException(ex);
+  void channelClosed(Throwable ex) {
+    if(ex != null){
+      RpcException e;
+      if(ex instanceof RpcException){
+        e = (RpcException) ex;
+      }else{
+        e = new RpcException(ex);  
+      }
+      for (RpcOutcome<?> f : map.values()) {
+        f.setException(e);
+      }
     }
   }
 
-  public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz){
     int i = circularInt.getNext();
-    DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
-    // logger.debug("Writing to map coord {}, future {}", i, future);
+    RpcListener<V> future = new RpcListener<V>(handler, clazz, i);
     Object old = map.put(i, future);
     if (old != null)
       throw new IllegalStateException(
           "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
     return future;
   }
+  
+  private class RpcListener<T> implements ChannelListenerWithCoordinationId, RpcOutcome<T>{
+    final RpcOutcomeListener<T> handler;
+    final Class<T> clazz;
+    final int coordinationId;
+    
+    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId) {
+      super();
+      this.handler = handler;
+      this.clazz = clazz;
+      this.coordinationId = coordinationId;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if(!future.isSuccess()){
+        removeFromMap(coordinationId);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void set(Object value) {
+      assert clazz.isAssignableFrom(value.getClass());
+      handler.success( (T) value);
+    }
+
+    @Override
+    public void setException(Throwable t) {
+      handler.failed(RpcException.mapException(t));
+    }
+
+    @Override
+    public Class<T> getOutcomeType() {
+      return clazz;
+    }
+
+    @Override
+    public int getCoordinationId() {
+      return coordinationId;
+    }
+    
+    
+  }
+//  
+//  public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
+//    int i = circularInt.getNext();
+//    DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
+//    // logger.debug("Writing to map coord {}, future {}", i, future);
+//    Object old = map.put(i, future);
+//    if (old != null)
+//      throw new IllegalStateException(
+//          "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
+//    return future;
+//  }
 
-  private DrillRpcFutureImpl<?> removeFromMap(int coordinationId) {
-    DrillRpcFutureImpl<?> rpc = map.remove(coordinationId);
+  private RpcOutcome<?> removeFromMap(int coordinationId) {
+    RpcOutcome<?> rpc = map.remove(coordinationId);
     if (rpc == null) {
       logger.error("Rpc is null.");
       throw new IllegalStateException(
@@ -62,11 +128,11 @@ public class CoordinationQueue {
     return rpc;
   }
 
-  public <V> DrillRpcFutureImpl<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
+  public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
     // logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
-    DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+    RpcOutcome<?> rpc = removeFromMap(coordinationId);
     // logger.debug("Got rpc from map {}", rpc);
-    Class<?> outcomeClass = rpc.getOutcomeClass();
+    Class<?> outcomeClass = rpc.getOutcomeType();
 
     if (outcomeClass != clazz) {
 
@@ -80,7 +146,7 @@ public class CoordinationQueue {
     }
 
     @SuppressWarnings("unchecked")
-    DrillRpcFutureImpl<V> crpc = (DrillRpcFutureImpl<V>) rpc;
+    RpcOutcome<V> crpc = (RpcOutcome<V>) rpc;
 
     // logger.debug("Returning casted future");
     return crpc;
@@ -88,7 +154,7 @@ public class CoordinationQueue {
 
   public void updateFailedFuture(int coordinationId, RpcFailure failure) {
     // logger.debug("Updating failed future.");
-    DrillRpcFutureImpl<?> rpc = removeFromMap(coordinationId);
+    RpcOutcome<?> rpc = removeFromMap(coordinationId);
     rpc.setException(new RemoteRpcException(failure));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index bae947a..9033ea1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -21,6 +21,4 @@ import com.google.common.util.concurrent.CheckedFuture;
 
 public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
-
-  public void addLightListener(RpcOutcomeListener<T> outcomeListener);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index ee14eeb..d5d3a9c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -22,22 +22,12 @@ import java.util.concurrent.ExecutionException;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 
-class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>{
+class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>, RpcOutcomeListener<V>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
 
-  final int coordinationId;
-  private final Class<V> clazz;
-
-  public DrillRpcFutureImpl(ListenableFuture<V> delegate, int coordinationId, Class<V> clazz) {
-    super(delegate);
-    this.coordinationId = coordinationId;
-    this.clazz = clazz;
-  }
-
-  public Class<V> getOutcomeClass(){
-    return clazz;
+  public DrillRpcFutureImpl() {
+    super(new InnerFuture<V>());
   }
   
   /**
@@ -53,24 +43,7 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
 
   @Override
   protected RpcException mapException(Exception ex) {
-    Throwable e = ex;
-    while(e instanceof ExecutionException){
-      e = e.getCause();
-    }
-    if (e instanceof RpcException)  return (RpcException) e;
-
-    return new RpcException(ex);
-
-  }
-
-  @SuppressWarnings("unchecked")
-  void setValue(Object value) {
-    assert clazz.isAssignableFrom(value.getClass());
-    ((InnerFuture<V>) super.delegate()).setValue((V) value);
-  }
-
-  boolean setException(Throwable t) {
-    return ((InnerFuture<V>) super.delegate()).setException(t);
+    return RpcException.mapException(ex);
   }
 
   public static class InnerFuture<T> extends AbstractFuture<T> {
@@ -85,34 +58,17 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
     }
   }
 
-  public class RpcOutcomeListenerWrapper implements Runnable{
-    final RpcOutcomeListener<V> inner;
-    
-    public RpcOutcomeListenerWrapper(RpcOutcomeListener<V> inner) {
-      super();
-      this.inner = inner;
-    }
-
-    @Override
-    public void run() {
-      try{
-        inner.success(DrillRpcFutureImpl.this.checkedGet());
-      }catch(RpcException e){
-        inner.failed(e);
-      }
-    }
-  }
-  
-  public void addLightListener(RpcOutcomeListener<V> outcomeListener){
-    this.addListener(new RpcOutcomeListenerWrapper(outcomeListener), MoreExecutors.sameThreadExecutor());
+  @Override
+  public void failed(RpcException ex) {
+    ( (InnerFuture<V>)delegate()).setException(ex);
   }
-  
-  
-  
-  public static <V> DrillRpcFutureImpl<V> getNewFuture(int coordinationId, Class<V> clazz) {
-    InnerFuture<V> f = new InnerFuture<V>();
-    return new DrillRpcFutureImpl<V>(f, coordinationId, clazz);
+
+  @Override
+  public void success(V value) {
+    ( (InnerFuture<V>)delegate()).setValue(value);
   }
 
 
+  
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 11764db..a680a97 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -64,6 +64,16 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
       SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+    DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
+    this.send(rpcFuture, connection, rpcType, protobufBody, clazz, dataBodies);
+    return rpcFuture;
+  }  
+  
+  public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> listener, C connection, T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+  
+    
+
 
     assert !Arrays.asList(dataBodies).contains(null);
     assert rpcConfig.checkSend(rpcType, protobufBody.getClass(), clazz);
@@ -72,14 +82,12 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     boolean completed = false;
 
     try {
-      // logger.debug("Seding message");
       Preconditions.checkNotNull(protobufBody);
-      DrillRpcFutureImpl<RECEIVE> rpcFuture = queue.getNewFuture(clazz);
-      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, rpcFuture.coordinationId, protobufBody, dataBodies);
+      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
+      OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
       ChannelFuture channelFuture = connection.getChannel().write(m);
-      channelFuture.addListener(new Listener(rpcFuture.coordinationId, clazz));
+      channelFuture.addListener(futureListener);
       completed = true;
-      return rpcFuture;
     } finally {
       if (!completed) {
         if (pBuffer != null) pBuffer.release();
@@ -140,10 +148,10 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       case RESPONSE:
         MessageLite m = getResponseDefaultInstance(msg.rpcType);
         assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
-        DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
+        RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
         Parser<?> parser = m.getParserForType();
         Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        rpcFuture.setValue(value);
+        rpcFuture.set(value);
         if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
 
         break;
@@ -162,39 +170,39 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
   }
 
-  private class Listener implements GenericFutureListener<ChannelFuture> {
-
-    private int coordinationId;
-    private Class<?> clazz;
-
-    public Listener(int coordinationId, Class<?> clazz) {
-      this.coordinationId = coordinationId;
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture channelFuture) throws Exception {
-      // logger.debug("Completed channel write.");
-
-      if (channelFuture.isCancelled()) {
-        DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
-        rpcFuture.setException(new CancellationException("Socket operation was canceled."));
-      } else if (!channelFuture.isSuccess()) {
-        try {
-          channelFuture.get();
-          throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
-        } catch (Exception e) {
-          logger.error("Error occurred during Rpc", e);
-          DrillRpcFutureImpl<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
-          rpcFuture.setException(e);
-        }
-      } else {
-        // send was successful. No need to modify DrillRpcFuture.
-        return;
-      }
-    }
-
-  }
+//  private class Listener implements GenericFutureListener<ChannelFuture> {
+//
+//    private int coordinationId;
+//    private Class<?> clazz;
+//
+//    public Listener(int coordinationId, Class<?> clazz) {
+//      this.coordinationId = coordinationId;
+//      this.clazz = clazz;
+//    }
+//
+//    @Override
+//    public void operationComplete(ChannelFuture channelFuture) throws Exception {
+//      // logger.debug("Completed channel write.");
+//
+//      if (channelFuture.isCancelled()) {
+//        RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+//        rpcFuture.setException(new CancellationException("Socket operation was canceled."));
+//      } else if (!channelFuture.isSuccess()) {
+//        try {
+//          channelFuture.get();
+//          throw new IllegalStateException("Future was described as completed and not succesful but did not throw an exception.");
+//        } catch (Exception e) {
+//          logger.error("Error occurred during Rpc", e);
+//          RpcOutcome<?> rpcFuture = queue.getFuture(-1, coordinationId, clazz);
+//          rpcFuture.setException(e);
+//        }
+//      } else {
+//        // send was successful. No need to modify DrillRpcFuture.
+//        return;
+//      }
+//    }
+//
+//  }
 
   public static <T> T get(ByteBuf pBody, Parser<T> parser) throws RpcException{
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
new file mode 100644
index 0000000..7c300d3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
+  public RpcCheckedFuture(ListenableFuture<T> delegate) {
+    super(delegate);
+  }
+
+  @Override
+  protected RpcException mapException(Exception e) {
+    return RpcException.mapException(e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
new file mode 100644
index 0000000..0f55488
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConnectionHandler.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcConnectionHandler<T extends RemoteConnection> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcConnectionHandler.class);
+  
+  public static enum FailureType{CONNECTION, HANDSHAKE_COMMUNICATION, HANDSHAKE_VALIDATION}
+  
+  public void connectionSucceeded(T connection);
+  public void connectionFailed(FailureType type, Throwable t);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
index ca66481..500f959 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcException.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import java.util.concurrent.ExecutionException;
+
 import org.apache.drill.common.exceptions.DrillIOException;
 
 /**
@@ -41,5 +43,16 @@ public class RpcException extends DrillIOException{
     super(cause);
   }
   
+  public static RpcException mapException(Throwable t){
+    while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+    if(t instanceof RpcException) return ((RpcException) t);
+    return new RpcException(t);
+  }
+  
+  public static RpcException mapException(String message, Throwable t){
+    while(t instanceof ExecutionException) t = ((ExecutionException)t).getCause();
+    return new RpcException(message, t);
+  }
+  
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
new file mode 100644
index 0000000..a25e5e7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+public interface RpcOutcome<T> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class);
+  
+  public void set(Object value);
+  public void setException(Throwable t);
+  public Class<T> getOutcomeType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index fac908c..771edcf 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -17,11 +17,10 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
-public abstract class RpcOutcomeListener<V> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcomeListener.class);
+public interface RpcOutcomeListener<V> {
   
-  public void failed(RpcException ex){};
-  public void success(V value){};
+  public void failed(RpcException ex);
+  public void success(V value);
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b8db98ad/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 20a7d7d..318abb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -36,7 +36,7 @@ public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
   protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
 
     if(!ctx.channel().isOpen()){
-      logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+      if(in.readableBytes() > 0) logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
       in.skipBytes(in.readableBytes());
       return;
     }