You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:30 UTC

[09/53] [abbrv] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
new file mode 100644
index 0000000..2900d99
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunner.java
@@ -0,0 +1,124 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+
+import com.yammer.metrics.Timer;
+
+/**
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
+ * messages. Two child implementation, root (driving) and child (driven) exist. 
+ */
+public class FragmentRunner implements Runnable, CancelableQuery, StatusProvider, Comparable<Object>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRunner.class);
+
+  private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+  private final RootExec root;
+  private final FragmentContext context;
+  private final FragmentRunnerListener listener;
+  
+  public FragmentRunner(FragmentContext context, RootExec root, FragmentRunnerListener listener){
+    this.context = context;
+    this.root = root;
+    this.listener = listener;
+  }
+
+  @Override
+  public FragmentStatus getStatus() {
+    return FragmentStatus.newBuilder() //
+        .setBatchesCompleted(context.batchesCompleted.get()) //
+        .setDataProcessed(context.dataProcessed.get()) //
+        .setMemoryUse(context.getAllocator().getAllocatedMemory()) //
+        .build();
+  }
+
+  @Override
+  public void cancel() {
+    updateState(FragmentState.CANCELLED);
+  }
+
+  public UserClientConnection getClient(){
+    return context.getConnection();
+  }
+  
+  @Override
+  public void run() {
+    if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
+      internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+      return;
+    }
+    
+    Timer.Context t = context.fragmentTime.time();
+    
+    // run the query until root.next returns false.
+    try{
+      while(state.get() == FragmentState.RUNNING_VALUE){
+        if(!root.next()){
+          updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+        }
+      }
+      
+      // If this isn't a finished stop, we'll inform other batches to finish up.
+      if(state.get() != FragmentState.FINISHED_VALUE){
+        root.stop();
+      }
+      
+    }catch(Exception ex){
+      internalFail(ex);
+    }finally{
+      t.stop();
+    }
+    
+  }
+  
+  private void internalFail(Throwable excep){
+    state.set(FragmentState.FAILED_VALUE);
+    listener.fail(context.getHandle(), "Failure while running fragment.", excep);
+  }
+  
+  private void updateState(FragmentState update){
+    state.set(update.getNumber());
+    listener.stateChanged(context.getHandle(), update);
+  }
+  
+  private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
+    boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
+    if (!success && exceptionOnFailure) {
+      internalFail(new RuntimeException(String.format(
+          "State was different than expected.  Attempting to update state from %s to %s however current state was %s.",
+          current.name(), update.name(), FragmentState.valueOf(state.get()))));
+      return false;
+    }
+    listener.stateChanged(context.getHandle(), update);
+    return true;
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    return o.hashCode() - this.hashCode();
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java
new file mode 100644
index 0000000..d978470
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/FragmentRunnerListener.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+
+public interface FragmentRunnerListener {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRunnerListener.class);
+  
+  void fail(FragmentHandle handle, String message, Throwable excep);
+  void stateChanged(FragmentHandle handle, FragmentState newState);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
new file mode 100644
index 0000000..243d677
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+import com.google.common.base.Preconditions;
+
+public class QueryWorkUnit {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
+  
+  private final PlanFragment rootFragment; // for local
+  private final FragmentRoot rootOperator; // for local
+  private final List<PlanFragment> fragments;
+  
+  public QueryWorkUnit(FragmentRoot rootOperator, PlanFragment rootFragment, List<PlanFragment> fragments) {
+    super();
+    Preconditions.checkNotNull(rootFragment);
+    Preconditions.checkNotNull(fragments);
+    Preconditions.checkNotNull(rootOperator);
+    
+    this.rootFragment = rootFragment;
+    this.fragments = fragments;
+    this.rootOperator = rootOperator;
+  }
+
+  public PlanFragment getRootFragment() {
+    return rootFragment;
+  }
+
+  public List<PlanFragment> getFragments() {
+    return fragments;
+  }
+
+  public FragmentRoot getRootOperator() {
+    return rootOperator;
+  }
+  
+  
+  
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java
new file mode 100644
index 0000000..abcb312
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RecordOutputStream.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public class RecordOutputStream {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordOutputStream.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
new file mode 100644
index 0000000..74fcd2b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.work.foreman.ErrorHelper;
+
+/**
+ * Informs remote node as fragment changes state.
+ */
+public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class);
+  
+  private final BitTunnel tunnel;
+
+  public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
+    super(context);
+    this.tunnel = tunnel;
+  }
+  
+  @Override
+  protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+    tunnel.sendFragmentStatus(status);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java
new file mode 100644
index 0000000..2e1296e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/ResourceRequest.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public class ResourceRequest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceRequest.class);
+  
+  public long memoryMin;
+  public long memoryDesired;
+  
+
+  public static class ResourceAllocation {
+    public long memory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java
new file mode 100644
index 0000000..12da7ba
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RootNodeDriver.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+public interface RootNodeDriver {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootNodeDriver.class);
+  
+  public boolean doNext();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
new file mode 100644
index 0000000..bbd9df7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+
+public interface StatusProvider {
+  public FragmentStatus getStatus();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
new file mode 100644
index 0000000..d3664a0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -0,0 +1,168 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.batch.BitComHandlerImpl;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
+import org.apache.drill.exec.work.user.UserWorker;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+public class WorkManager implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
+  
+  private Set<IncomingFragmentHandler> incomingFragments = Collections.newSetFromMap(Maps.<IncomingFragmentHandler, Boolean> newConcurrentMap());
+
+  private PriorityBlockingQueue<Runnable> pendingTasks = Queues.newPriorityBlockingQueue();
+  
+  private Map<FragmentHandle, FragmentRunner> runningFragments = Maps.newConcurrentMap();
+  
+  private ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();
+
+  private BootStrapContext bContext;
+  private DrillbitContext dContext;
+
+  private final BitComHandler bitComWorker;
+  private final UserWorker userWorker;
+  private final WorkerBee bee;
+  private Executor executor = Executors.newFixedThreadPool(4);
+  private final EventThread eventThread;
+  
+  public WorkManager(BootStrapContext context){
+    this.bee = new WorkerBee();
+    this.bContext = context;
+    this.bitComWorker = new BitComHandlerImpl(bee);
+    this.userWorker = new UserWorker(bee);
+    this.eventThread = new EventThread();
+    
+  }
+  
+  public void start(DrillbitEndpoint endpoint, DistributedCache cache, BitCom com, ClusterCoordinator coord){
+    this.dContext = new DrillbitContext(endpoint, bContext, coord, com, cache);
+    eventThread.start();
+  }
+  
+  public BitComHandler getBitComWorker(){
+    return bitComWorker;
+  }
+
+  public UserWorker getUserWorker(){
+    return userWorker;
+  }
+  
+  @Override
+  public void close() throws IOException {
+  }
+  
+
+  public DrillbitContext getContext() {
+    return dContext;
+  }
+
+  // create this so items can see the data here whether or not they are in this package.
+  public class WorkerBee{
+
+    public void addFragmentRunner(FragmentRunner runner){
+      pendingTasks.add(runner);
+    }
+    
+    public void addNewForeman(Foreman foreman){
+      pendingTasks.add(foreman);
+    }
+
+
+    public void addFragmentPendingRemote(IncomingFragmentHandler handler){
+      incomingFragments.add(handler);
+    }
+    
+    public void startFragmentPendingRemote(IncomingFragmentHandler handler){
+      incomingFragments.remove(handler);
+      pendingTasks.add(handler.getRunnable());
+    }
+    
+    public FragmentRunner getFragmentRunner(FragmentHandle handle){
+      return runningFragments.get(handle);
+    }
+    
+    public Foreman getForemanForQueryId(QueryId queryId){
+      return queries.get(queryId);
+    }
+    
+    public void retireForeman(Foreman foreman){
+      queries.remove(foreman.getQueryId(), foreman);
+    }
+
+    public DrillbitContext getContext() {
+      return dContext;
+    }
+
+  }
+
+
+
+ private class EventThread extends Thread{
+   public EventThread(){
+     this.setDaemon(true);
+     this.setName("WorkManager Event Thread");
+   }
+
+  @Override
+  public void run() {
+    try {
+    while(true){
+      logger.debug("Checking for pending work tasks.");
+      Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
+      if(r != null){
+        executor.execute(r);  
+      }
+      
+    }
+    } catch (InterruptedException e) {
+      logger.info("Work Manager stopping as it was interrupted.");
+    }
+  }
+   
+   
+ }
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
new file mode 100644
index 0000000..5dacb71
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractFragmentCollector implements BatchCollector{
+  private final List<DrillbitEndpoint> incoming;
+  private final int oppositeMajorFragmentId;
+  private final AtomicIntegerArray remainders;
+  private final AtomicInteger remainingRequired;
+  protected final RawBatchBuffer[] buffers;
+  private final AtomicInteger parentAccounter;
+  private final AtomicInteger finishedStreams = new AtomicInteger();
+  
+  public AbstractFragmentCollector(AtomicInteger parentAccounter, Receiver receiver, int minInputsRequired) {
+    Preconditions.checkArgument(minInputsRequired > 0);
+    Preconditions.checkNotNull(receiver);
+    Preconditions.checkNotNull(parentAccounter);
+
+    this.parentAccounter = parentAccounter;
+    this.incoming = receiver.getProvidingEndpoints();
+    this.remainders = new AtomicIntegerArray(incoming.size());
+    this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
+    this.buffers = new RawBatchBuffer[minInputsRequired];
+    for(int i = 0; i < buffers.length; i++){
+      buffers[i] = new UnlmitedRawBatchBuffer();
+    }
+    if (receiver.supportsOutOfOrderExchange()) {
+      this.remainingRequired = new AtomicInteger(1);
+    } else {
+      this.remainingRequired = new AtomicInteger(minInputsRequired);
+    }
+  }
+
+  public int getOppositeMajorFragmentId() {
+    return oppositeMajorFragmentId;
+  }
+
+  public RawBatchBuffer[] getBuffers(){
+    return buffers;
+  }
+  
+  public abstract void streamFinished(int minorFragmentId);
+  
+  public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch) {
+    if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
+      int rem = remainingRequired.decrementAndGet();
+      if (rem == 0) {
+        parentAccounter.decrementAndGet();
+      }
+    }
+    if(batch.getHeader().getIsLastBatch()){
+      streamFinished(minorFragmentId);
+    }
+    getBuffer(minorFragmentId).enqueue(throttle, batch);
+  }
+
+  protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
new file mode 100644
index 0000000..ff091d7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BatchCollector.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+
+interface BatchCollector {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCollector.class);
+
+  public void batchArrived(ConnectionThrottle throttle, int minorFragmentId, RawFragmentBatch batch);
+  public int getOppositeMajorFragmentId();
+  public RawBatchBuffer[] getBuffers();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java
new file mode 100644
index 0000000..97064e3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandler.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+
+public interface BitComHandler {
+
+  public abstract Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody)
+      throws RpcException;
+
+  public abstract void startNewRemoteFragment(PlanFragment fragment);
+
+  public abstract Ack cancelFragment(FragmentHandle handle);
+
+  public abstract void registerIncomingFragmentHandler(IncomingFragmentHandler handler);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
new file mode 100644
index 0000000..9b227da
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -0,0 +1,205 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import static org.apache.drill.exec.rpc.RpcBus.get;
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.ExecProtos.BitHandshake;
+import org.apache.drill.exec.proto.ExecProtos.BitStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitRpcConfig;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.MessageLite;
+
+public class BitComHandlerImpl implements BitComHandler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComHandlerImpl.class);
+  
+  private ConcurrentMap<FragmentHandle, IncomingFragmentHandler> handlers = Maps.newConcurrentMap();
+  private final WorkerBee bee;
+  
+  public BitComHandlerImpl(WorkerBee bee) {
+    super();
+    this.bee = bee;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.batch.BitComHandler#handle(org.apache.drill.exec.rpc.bit.BitConnection, int, io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)
+   */
+  @Override
+  public Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received bit com message of type {}", rpcType);
+
+    switch (rpcType) {
+    
+    case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
+      FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+      cancelFragment(handle);
+      return BitRpcConfig.OK;
+
+    case RpcType.REQ_FRAGMENT_STATUS_VALUE:
+      connection.getListenerPool().status( get(pBody, FragmentStatus.PARSER));
+      // TODO: Support a type of message that has no response.
+      return BitRpcConfig.OK;
+
+    case RpcType.REQ_INIATILIZE_FRAGMENT_VALUE:
+      PlanFragment fragment = get(pBody, PlanFragment.PARSER);
+      startNewRemoteFragment(fragment);
+      return BitRpcConfig.OK;
+      
+    case RpcType.REQ_RECORD_BATCH_VALUE:
+      try {
+        FragmentRecordBatch header = get(pBody, FragmentRecordBatch.PARSER);
+        incomingRecordBatch(connection, header, dBody);
+        return BitRpcConfig.OK;
+      } catch (FragmentSetupException e) {
+        throw new RpcException("Failure receiving record batch.", e);
+      }
+
+    default:
+      throw new RpcException("Not yet supported.");
+    }
+
+  }
+  
+  
+  
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.batch.BitComHandler#startNewRemoteFragment(org.apache.drill.exec.proto.ExecProtos.PlanFragment)
+   */
+  @Override
+  public void startNewRemoteFragment(PlanFragment fragment){
+    logger.debug("Received remote fragment start instruction", fragment);
+    FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null);
+    BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
+    RemotingFragmentRunnerListener listener = new RemotingFragmentRunnerListener(context, tunnel);
+    try{
+      FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+      RootExec exec = ImplCreator.getExec(context, rootOperator);
+      FragmentRunner fr = new FragmentRunner(context, exec, listener);
+      bee.addFragmentRunner(fr);
+
+    }catch(IOException e){
+      listener.fail(fragment.getHandle(), "Failure while parsing fragment execution plan.", e);
+    }catch(ExecutionSetupException e){
+      listener.fail(fragment.getHandle(), "Failure while setting up execution plan.", e);
+    }
+    
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
+   */
+  @Override
+  public Ack cancelFragment(FragmentHandle handle){
+    IncomingFragmentHandler handler = handlers.get(handle);
+    if(handler != null){
+      // try remote fragment cancel.
+      handler.cancel();
+    }else{
+      // then try local cancel.
+      FragmentRunner runner = bee.getFragmentRunner(handle);
+      if(runner != null) runner.cancel();
+    }
+    
+    return Acks.OK;
+  }
+  
+  
+  /**
+   * Returns a positive Ack if this fragment is accepted.  
+   */
+  private Ack incomingRecordBatch(RemoteConnection connection, FragmentRecordBatch fragmentBatch, ByteBuf body) throws FragmentSetupException{
+    FragmentHandle handle = fragmentBatch.getHandle();
+    IncomingFragmentHandler handler = handlers.get(handle);
+
+    // Create a handler if there isn't already one.
+    if(handler == null){
+      
+      
+      
+      PlanFragment fragment = bee.getContext().getCache().getFragment(handle);
+      if(fragment == null){
+        logger.error("Received batch where fragment was not in cache.");
+        return Acks.FAIL;
+      }
+      
+
+      IncomingFragmentHandler newHandler = new RemoteFragmentHandler(fragment, bee.getContext(), bee.getContext().getBitCom().getTunnel(fragment.getForeman()));
+      
+      // since their could be a race condition on the check, we'll use putIfAbsent so we don't have two competing handlers.
+      handler = handlers.putIfAbsent(fragment.getHandle(), newHandler);
+          
+      if(handler == null){
+        // we added a handler, inform foreman that we did so.  This way, the foreman can track status.  We also tell foreman that we don't need inform ourself.
+        bee.addFragmentPendingRemote(newHandler);
+        handler = newHandler;
+      }
+    }
+    
+    boolean canRun = handler.handle(connection.getConnectionThrottle(), new RawFragmentBatch(fragmentBatch, body));
+    if(canRun){
+      // if we've reached the canRun threshold, we'll proceed.  This expects handler.handle() to only return a single true.
+      bee.startFragmentPendingRemote(handler);
+    }
+    if(handler.isDone()){
+      handlers.remove(handler.getHandle());
+    }
+    
+    return Acks.OK;
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.batch.BitComHandler#registerIncomingFragmentHandler(org.apache.drill.exec.work.fragment.IncomingFragmentHandler)
+   */
+  @Override
+  public void registerIncomingFragmentHandler(IncomingFragmentHandler handler){
+    IncomingFragmentHandler old = handlers.putIfAbsent(handler.getHandle(), handler);
+    assert old == null : "You can only register a fragment handler if one hasn't been registered already.";
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
new file mode 100644
index 0000000..20775c5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/**
+ * Determines when a particular fragment has enough data for each of its receiving exchanges to commence execution.
+ */
+public class IncomingBuffers {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
+
+  private final AtomicInteger streamsRemaining = new AtomicInteger(0);
+  private final AtomicInteger remainingRequired = new AtomicInteger(0);
+  private final Map<Integer, BatchCollector> fragCounts;
+
+  public IncomingBuffers(PhysicalOperator root) {
+    Map<Integer, BatchCollector> counts = Maps.newHashMap();
+    root.accept(new CountRequiredFragments(), counts);
+    streamsRemaining.set(remainingRequired.get());
+    fragCounts = ImmutableMap.copyOf(counts);
+  }
+
+  public boolean batchArrived(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException {
+    // no need to do anything if we've already enabled running.
+    logger.debug("New Batch Arrived {}", batch);
+    if(batch.getHeader().getIsLastBatch()){
+      streamsRemaining.decrementAndGet();
+    }
+    
+    BatchCollector fSet = fragCounts.get(batch.getHeader().getSendingMajorFragmentId());
+    if (fSet == null) throw new FragmentSetupException("We received a major fragment id that we were not expecting.");
+    fSet.batchArrived(throttle, batch.getHeader().getSendingMinorFragmentId(), batch);
+    return remainingRequired.get() == 0;
+  }
+
+  public int getRemainingRequired() {
+    int rem = remainingRequired.get();
+    if (rem < 0) return 0;
+    return rem;
+  }
+
+  public RawBatchBuffer[] getBuffers(int senderMajorFragmentId){
+    return fragCounts.get(senderMajorFragmentId).getBuffers();
+  }
+  
+  
+  /**
+   * Designed to setup initial values for arriving fragment accounting.
+   */
+  public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, BatchCollector>, RuntimeException> {
+
+    @Override
+    public Void visitReceiver(Receiver receiver, Map<Integer, BatchCollector> counts) throws RuntimeException {
+      BatchCollector set;
+      if (receiver.supportsOutOfOrderExchange()) {
+        set = new MergingCollector(remainingRequired, receiver);
+      } else {
+        set = new PartitionedCollector(remainingRequired, receiver);
+      }
+
+      counts.put(set.getOppositeMajorFragmentId(), set);
+      remainingRequired.incrementAndGet();
+      return null;
+    }
+
+    
+    @Override
+    public Void visitOp(PhysicalOperator op, Map<Integer, BatchCollector> value) throws RuntimeException {
+      for(PhysicalOperator o : op){
+        o.accept(this, value);
+      }
+      return null;
+    }
+
+
+  }
+
+  public boolean isDone(){
+    return streamsRemaining.get() < 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
new file mode 100644
index 0000000..e21d69a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.physical.base.Receiver;
+
+public class MergingCollector extends AbstractFragmentCollector{
+
+  private AtomicInteger streamsRunning;
+  
+  public MergingCollector(AtomicInteger parentAccounter, Receiver receiver) {
+    super(parentAccounter, receiver, 1);
+    streamsRunning = new AtomicInteger(parentAccounter.get());
+  }
+
+  @Override
+  protected RawBatchBuffer getBuffer(int minorFragmentId) {
+    return buffers[0];
+  }
+
+  @Override
+  public void streamFinished(int minorFragmentId) {
+    if(streamsRunning.decrementAndGet() == 0) buffers[0].finished();
+  }
+  
+  
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
new file mode 100644
index 0000000..116ca26
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.physical.base.Receiver;
+
+public class PartitionedCollector extends AbstractFragmentCollector{
+
+  public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver) {
+    super(parentAccounter, receiver, receiver.getProvidingEndpoints().size());
+  }
+
+  @Override
+  protected RawBatchBuffer getBuffer(int minorFragmentId) {
+    return buffers[minorFragmentId];
+  }
+
+  @Override
+  public void streamFinished(int minorFragmentId) {
+    buffers[minorFragmentId].finished();
+  }
+  
+  
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
new file mode 100644
index 0000000..0f10e26
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+public interface RawBatchBuffer extends RawFragmentBatchProvider{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawBatchBuffer.class);
+  
+  public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch);
+  
+  /**
+   * Inform the buffer that no more records are expected.
+   */
+  public void finished();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
new file mode 100644
index 0000000..f97d878
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.collect.Queues;
+
+public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlmitedRawBatchBuffer.class);
+
+  private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
+  private volatile boolean finished = false;
+  
+  @Override
+  public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
+    buffer.add(batch);
+  }
+
+//  @Override
+//  public RawFragmentBatch dequeue() {
+//    return buffer.poll();
+//  }
+
+  @Override
+  public void kill(FragmentContext context) {
+    // TODO: Pass back or kill handler?
+  }
+
+  
+  @Override
+  public void finished() {
+    finished = true;
+  }
+
+  @Override
+  public RawFragmentBatch getNext(){
+    
+    RawFragmentBatch b = buffer.poll();
+    if(b == null && !finished){
+      try {
+        return buffer.take();
+      } catch (InterruptedException e) {
+        return null;
+      }
+    }
+    
+    return null;
+    
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
new file mode 100644
index 0000000..d4c4014
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.util.UUID;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.slf4j.Logger;
+
+
+public class ErrorHelper {
+  
+  public static DrillPBError logAndConvertError(DrillbitEndpoint endpoint, String message, Throwable t, Logger logger){
+    String id = UUID.randomUUID().toString();
+    DrillPBError.Builder builder = DrillPBError.newBuilder();
+    builder.setEndpoint(endpoint);
+    builder.setErrorId(id);
+    if(message != null){
+      builder.setMessage(message);  
+    }else{
+      builder.setMessage(t.getMessage());
+    }
+    builder.setErrorType(0);
+    
+    // record the error to the log for later reference.
+    logger.error("Error {}: {}", id, message, t);
+    
+    
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
new file mode 100644
index 0000000..dea8282
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -0,0 +1,272 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.RequestResults;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.AtomicState;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Foreman manages all queries where this is the driving/root node.
+ */
+public class Foreman implements Runnable, Closeable, Comparable<Object>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
+
+  private QueryId queryId;
+  private RunQuery queryRequest;
+  private QueryContext context;
+  private RunningFragmentManager fragmentManager;
+  private WorkerBee bee;
+  private UserClientConnection initiatingClient;
+  private final AtomicState<QueryState> state;
+  
+  
+  public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId,
+      RunQuery queryRequest) {
+    this.queryId = queryId;
+    this.queryRequest = queryRequest;
+    this.context = new QueryContext(queryId, dContext);
+    this.initiatingClient = connection;
+    this.fragmentManager = new RunningFragmentManager(new ForemanManagerListener(), new TunnelManager(dContext.getBitCom()));
+    this.bee = bee;
+    
+    this.state = new AtomicState<QueryState>(QueryState.PENDING) {
+      protected QueryState getStateFromNumber(int i) {
+        return QueryState.valueOf(i);
+      }
+    };
+  }
+  
+  private boolean isFinished(){
+    switch(state.getState()){
+    case PENDING:
+    case RUNNING:
+      return false;
+    default:
+      return true;
+    }
+    
+  }
+
+  private void fail(String message, Throwable t) {
+    if(isFinished()){
+      logger.error("Received a failure message query finished of: {}", message, t);
+    }
+    DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), message, t, logger);
+    QueryResult result = QueryResult //
+        .newBuilder() //
+        .addError(error) //
+        .setIsLastChunk(true) //
+        .setQueryState(QueryState.FAILED) //
+        .build();
+    cleanupAndSendResult(result);
+  }
+
+  
+  public void cancel() {
+    if(isFinished()){
+      return;
+    }
+    
+    // cancel remote fragments.
+    fragmentManager.cancel();
+    
+    QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.CANCELED).setIsLastChunk(true).setQueryId(queryId).build();
+    cleanupAndSendResult(result);
+  }
+  
+  void cleanupAndSendResult(QueryResult result){
+    bee.retireForeman(this);
+    initiatingClient.sendResult(new QueryWritableBatch(result)).addLightListener(new ResponseSendListener());
+  }
+
+  private class ResponseSendListener extends RpcOutcomeListener<Ack> {
+    @Override
+    public void failed(RpcException ex) {
+      logger
+          .info(
+              "Failure while trying communicate query result to initating client.  This would happen if a client is disconnected before response notice can be sent.",
+              ex);
+    }
+  }
+  
+
+
+  /**
+   * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
+   */
+  public void run() {
+    // convert a run query request into action
+
+    switch (queryRequest.getType()) {
+
+    case LOGICAL:
+      parseAndRunLogicalPlan(queryRequest.getPlan());
+      break;
+    case PHYSICAL:
+      parseAndRunPhysicalPlan(queryRequest.getPlan());
+      break;
+    case SQL:
+      runSQL(queryRequest.getPlan());
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private void parseAndRunLogicalPlan(String json) {
+    try {
+      LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+      PhysicalPlan physicalPlan = convert(logicalPlan);
+      runPhysicalPlan(physicalPlan);
+    } catch (IOException e) {
+      fail("Failure while parsing logical plan.", e);
+    }
+  }
+
+  private void parseAndRunPhysicalPlan(String json) {
+    try {
+      PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
+      runPhysicalPlan(plan);
+    } catch (IOException e) {
+      fail("Failure while parsing physical plan.", e);
+    }
+  }
+
+  private void runPhysicalPlan(PhysicalPlan plan) {
+
+    PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+    MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
+    Fragment rootFragment;
+    try {
+      rootFragment = rootOperator.accept(makeFragmentsVisitor, null);
+    } catch (FragmentSetupException e) {
+      fail("Failure while fragmenting query.", e);
+      return;
+    }
+    PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
+    SimpleParallelizer parallelizer = new SimpleParallelizer();
+
+    try {
+      QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10);
+
+      List<PlanFragment> leafFragments = Lists.newArrayList();
+
+      // store fragments in distributed grid.
+      for (PlanFragment f : work.getFragments()) {
+        if (f.getLeafFragment()) {
+          leafFragments.add(f);
+        } else {
+          context.getCache().storeFragment(f);
+        }
+      }
+
+      fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments);
+
+    
+    } catch (ExecutionSetupException e) {
+      fail("Failure while setting up query.", e);
+    }
+
+  }
+
+  private void runSQL(String json) {
+    throw new UnsupportedOperationException();
+  }
+
+  private PhysicalPlan convert(LogicalPlan plan) {
+    throw new UnsupportedOperationException();
+  }
+
+  public QueryResult getResult(UserClientConnection connection, RequestResults req) {
+
+    return null;
+  }
+
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+  
+  QueryState getQueryState(){
+    return this.state.getState();
+  }
+
+  public boolean rootCoorespondsTo(FragmentHandle handle){
+    throw new UnsupportedOperationException();
+  }
+  
+  class ForemanManagerListener{
+    void fail(String message, Throwable t) {
+      ForemanManagerListener.this.fail(message, t);
+    }
+    
+    void cleanupAndSendResult(QueryResult result){
+      ForemanManagerListener.this.cleanupAndSendResult(result);
+    }
+    
+  }
+
+
+
+  @Override
+  public int compareTo(Object o) {
+    return o.hashCode() - o.hashCode();
+  }
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
new file mode 100644
index 0000000..d906ba2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+
+public interface FragmentStatusListener {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusListener.class);
+  
+  public void statusUpdate(FragmentStatus status);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
new file mode 100644
index 0000000..20797b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -0,0 +1,266 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.AbstractFragmentRunnerListener;
+import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
+import org.apache.drill.exec.work.fragment.LocalFragmentHandler;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.  
+ */
+class RunningFragmentManager implements FragmentStatusListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RunningFragmentManager.class);
+  
+  public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be
+  private final TunnelManager tun;
+  private ForemanManagerListener foreman;
+  private AtomicInteger remainingFragmentCount;
+  private FragmentRunner rootRunner;
+  
+  public RunningFragmentManager(ForemanManagerListener foreman, TunnelManager tun) {
+    super();
+    this.foreman = foreman;
+    this.tun = tun;
+    this.remainingFragmentCount = new AtomicInteger(0);
+  }
+
+  public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
+    remainingFragmentCount.set(leafFragments.size()+1);
+
+    // set up the root framgnet first so we'll have incoming buffers available.
+    {
+      IncomingBuffers buffers = new IncomingBuffers(rootOperator);
+      
+      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers);
+      RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+      // add fragment to local node.
+      map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+      rootRunner = new FragmentRunner(rootContext, rootExec, new RootFragmentManager(rootContext, rootFragment));
+      LocalFragmentHandler handler = new LocalFragmentHandler(rootFragment.getHandle(), buffers, rootRunner);
+      if(buffers.isDone()){
+        bee.addFragmentRunner(handler.getRunnable());
+      }else{
+        bee.getContext().getBitCom().registerIncomingBatchHandler(handler);
+      }
+      
+    }
+    
+    // send remote fragments.
+    for (PlanFragment f : leafFragments) {
+      sendRemoteFragment(f);
+    }
+    
+  }
+    
+  private void sendRemoteFragment(PlanFragment fragment){
+    map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
+    FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
+    tun.get(fragment.getAssignment()).sendFragment(fragment).addLightListener(listener);
+  }
+  
+  
+  @Override
+  public void statusUpdate(FragmentStatus status) {
+    
+    switch(status.getState()){
+    case AWAITING_ALLOCATION:
+      updateStatus(status);
+      break;
+    case CANCELLED:
+      // we don't care about cancellation messages since we're the only entity that should drive cancellations.
+      break;
+    case FAILED:
+      fail(status);
+      break;
+    case FINISHED:
+      finished(status);
+      break;
+    case RUNNING:
+      updateStatus(status);
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+  private void updateStatus(FragmentStatus status){
+    map.get(status.getHandle()).setStatus(status);
+  }
+  
+  private void finished(FragmentStatus status){
+    updateStatus(status);
+    int remaining = remainingFragmentCount.decrementAndGet();
+    if(remaining == 0){
+      QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.COMPLETED).build();
+      foreman.cleanupAndSendResult(result);
+    }
+  }
+  
+  private void fail(FragmentStatus status){
+    updateStatus(status);
+    stopQuery();
+    QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.FAILED).build();
+    foreman.cleanupAndSendResult(result);
+  }
+ 
+  
+  private void stopQuery(){
+    // Stop all queries with a currently active status.
+//    for(FragmentData data: map.values()){
+//      FragmentHandle handle = data.getStatus().getHandle();
+//      switch(data.getStatus().getState()){
+//      case SENDING:
+//      case AWAITING_ALLOCATION:
+//      case RUNNING:
+//        if(data.isLocal()){
+//          rootRunner.cancel();
+//        }else{
+//          tun.get(data.getEndpoint()).cancelFragment(handle).addLightListener(new CancelListener(data.endpoint, handle));
+//        }
+//        break;
+//      default:
+//        break;
+//      }
+//    }
+  }
+  
+  public void cancel(){
+    stopQuery();
+  }
+
+  private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
+    
+    public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
+      super(endpoint, handle);
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+    }
+
+    @Override
+    public void success(Ack value) {
+      if(!value.getOk()){
+        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+      }
+      // do nothing.
+    }
+
+  };
+  
+  public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){
+    return new FragmentSubmitListener(endpoint, value);
+  }
+  
+  
+  
+  private class FragmentSubmitListener extends EndpointListener<Ack, PlanFragment>{
+    
+    public FragmentSubmitListener(DrillbitEndpoint endpoint, PlanFragment value) {
+      super(endpoint, value);
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      stopQuery();
+    }
+
+  }
+  
+  
+  private class FragmentData{
+    private final boolean isLocal;
+    private volatile FragmentStatus status;
+    private volatile long lastStatusUpdate = 0;
+    private final DrillbitEndpoint endpoint;
+    
+    public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
+      super();
+      this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
+      this.endpoint = endpoint;
+      this.isLocal = isLocal;
+    }
+    
+    public void setStatus(FragmentStatus status){
+      this.status = status;
+      lastStatusUpdate = System.currentTimeMillis();
+    }
+
+    public FragmentStatus getStatus() {
+      return status;
+    }
+
+    public boolean isLocal() {
+      return isLocal;
+    }
+
+    public long getLastStatusUpdate() {
+      return lastStatusUpdate;
+    }
+
+    public DrillbitEndpoint getEndpoint() {
+      return endpoint;
+    }
+    
+    
+  }
+
+  private class RootFragmentManager extends AbstractFragmentRunnerListener{
+
+    private RootFragmentManager(FragmentContext context, PlanFragment fragment){
+      super(context);
+    }
+
+    @Override
+    protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+      RunningFragmentManager.this.updateStatus(status);
+    }
+
+
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java
new file mode 100644
index 0000000..ad3534c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/TunnelManager.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.foreman;
+
+import java.util.Map;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Keeps a local list of tunnels associated with a particular Foreman.
+ */
+public class TunnelManager {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TunnelManager.class);
+  
+  private final BitCom com;
+  private Map<DrillbitEndpoint, BitTunnel> tunnels = Maps.newHashMap();
+  
+  public TunnelManager(BitCom com){
+    this.com = com;
+  }
+  
+  public BitTunnel get(DrillbitEndpoint ep){
+    BitTunnel bt = tunnels.get(ep);
+    if(bt == null){
+      bt = com.getTunnel(ep);
+      tunnels.put(ep, bt);
+    }
+    
+    return bt;
+  }
+  
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
new file mode 100644
index 0000000..b4e9308
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/IncomingFragmentHandler.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.work.FragmentRunner;
+
+/**
+ * Handles incoming fragments as they arrive, routing them as apporpriate. 
+ */
+public interface IncomingFragmentHandler {
+
+  /**
+   * Handle the next incoming fragment.
+   * @param throttle
+   * @param batch
+   * @return True if the fragment has enough incoming data to be able to be run.
+   * @throws FragmentSetupException
+   */
+  public abstract boolean handle(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException;
+
+  /**
+   * Get the fragment runner for this incoming fragment.  Note, this can only be requested once.
+   * @return
+   */
+  public abstract FragmentRunner getRunnable();
+
+  public abstract void cancel();
+  public boolean isDone();
+  public abstract FragmentHandle getHandle();
+}
\ No newline at end of file