You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/03/19 22:08:17 UTC

[7/9] drill git commit: DRILL-2245: Clean up query setup and execution kickoff in Foreman/WorkManager in order to ensure consistent handling, and avoid hangs and races, with the goal of improving Drillbit robustness.

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3228da9..3a7123d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.work.batch;
 import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
 
-import java.io.IOException;
-
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -33,7 +31,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcConstants;
@@ -42,123 +39,129 @@ import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
 
 public class ControlHandlerImpl implements ControlMessageHandler {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
   private final WorkerBee bee;
 
-  public ControlHandlerImpl(WorkerBee bee) {
-    super();
+  public ControlHandlerImpl(final WorkerBee bee) {
     this.bee = bee;
   }
 
   @Override
-  public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+  public Response handle(final ControlConnection connection, final int rpcType,
+      final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
     if (RpcConstants.EXTRA_DEBUGGING) {
       logger.debug("Received bit com message of type {}", rpcType);
     }
 
     switch (rpcType) {
 
-    case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
-      FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+    case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+      final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
       cancelFragment(handle);
       return DataRpcConfig.OK;
+    }
 
-    case RpcType.REQ_RECEIVER_FINISHED_VALUE:
-      FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+    case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+      final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
       receivingFragmentFinished(finishedReceiver);
       return DataRpcConfig.OK;
+    }
 
     case RpcType.REQ_FRAGMENT_STATUS_VALUE:
-      bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER));
+      bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
       // TODO: Support a type of message that has no response.
       return DataRpcConfig.OK;
 
-    case RpcType.REQ_QUERY_CANCEL_VALUE:
-      QueryId id = get(pBody, QueryId.PARSER);
-      Foreman f = bee.getForemanForQueryId(id);
-      if(f != null){
-        f.cancel();
+    case RpcType.REQ_QUERY_CANCEL_VALUE: {
+      final QueryId queryId = get(pBody, QueryId.PARSER);
+      final Foreman foreman = bee.getForemanForQueryId(queryId);
+      if (foreman != null) {
+        foreman.cancel();
         return DataRpcConfig.OK;
-      }else{
+      } else {
         return DataRpcConfig.FAIL;
       }
+    }
 
-    case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE:
-      InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
-      for(int i =0; i < fragments.getFragmentCount(); i++){
+    case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
+      final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+      for(int i = 0; i < fragments.getFragmentCount(); i++) {
         startNewRemoteFragment(fragments.getFragment(i));
       }
       return DataRpcConfig.OK;
+    }
 
-    case RpcType.REQ_QUERY_STATUS_VALUE:
-      QueryId queryId = get(pBody, QueryId.PARSER);
-      Foreman foreman = bee.getForemanForQueryId(queryId);
-      QueryProfile profile;
+    case RpcType.REQ_QUERY_STATUS_VALUE: {
+      final QueryId queryId = get(pBody, QueryId.PARSER);
+      final Foreman foreman = bee.getForemanForQueryId(queryId);
       if (foreman == null) {
         throw new RpcException("Query not running on node.");
-      } else {
-        profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile();
       }
+      final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
       return new Response(RpcType.RESP_QUERY_STATUS, profile);
+    }
 
     default:
       throw new RpcException("Not yet supported.");
     }
-
   }
 
   @Override
-  public void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException {
+  public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
     logger.debug("Received remote fragment start instruction", fragment);
 
+    final DrillbitContext drillbitContext = bee.getContext();
     try {
       // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
-      if(fragment.getLeafFragment()){
-        FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, bee.getContext().getFunctionImplementationRegistry());
-        ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman());
-        NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
-        FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-        FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener);
+      if (fragment.getLeafFragment()) {
+        final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+            drillbitContext.getFunctionImplementationRegistry());
+        final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+        final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+        final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+            fragment.getFragmentJson());
+        final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
         bee.addFragmentRunner(fr);
-      }else{ // isIntermediate, store for incoming data.
-        NonRootFragmentManager manager = new NonRootFragmentManager(fragment, bee);
-        bee.getContext().getWorkBus().setFragmentManager(manager);
+      } else {
+        // isIntermediate, store for incoming data.
+        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+        drillbitContext.getWorkBus().addFragmentManager(manager);
       }
 
     } catch (Exception e) {
-        throw new UserRpcException(bee.getContext().getEndpoint(), "Failure while trying to start remote fragment", e);
+        throw new UserRpcException(drillbitContext.getEndpoint(),
+            "Failure while trying to start remote fragment", e);
     } catch (OutOfMemoryError t) {
       if (t.getMessage().startsWith("Direct buffer")) {
-        throw new UserRpcException(bee.getContext().getEndpoint(), "Out of direct memory while trying to start remote fragment", t);
+        throw new UserRpcException(drillbitContext.getEndpoint(),
+            "Out of direct memory while trying to start remote fragment", t);
       } else {
         throw t;
       }
     }
-
   }
 
   /* (non-Javadoc)
    * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
    */
   @Override
-  public Ack cancelFragment(FragmentHandle handle) {
-    FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+  public Ack cancelFragment(final FragmentHandle handle) {
+    final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
     if (manager != null) {
       // try remote fragment cancel.
       manager.cancel();
     } else {
       // then try local cancel.
-      FragmentExecutor runner = bee.getFragmentRunner(handle);
+      final FragmentExecutor runner = bee.getFragmentRunner(handle);
       if (runner != null) {
         runner.cancel();
       }
@@ -167,8 +170,9 @@ public class ControlHandlerImpl implements ControlMessageHandler {
     return Acks.OK;
   }
 
-  public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) {
-    FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+  private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+    final FragmentManager manager =
+        bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
 
     FragmentExecutor executor;
     if (manager != null) {
@@ -184,5 +188,4 @@ public class ControlHandlerImpl implements ControlMessageHandler {
 
     return Acks.OK;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index f0b4983..2a79e42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -162,6 +162,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
         return batch;
       } catch (InterruptedException e) {
         return null;
+        // TODO InterruptedException
       }
     }
     if (w == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 3d5b948..2430e64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -162,6 +162,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
         b = buffer.take();
       } catch (InterruptedException e) {
         return null;
+        // TODO InterruptedException
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
index ca52f0c..80f2ca1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
@@ -26,7 +26,7 @@ import java.util.Set;
  * Interface to define the listener to take actions when the set of active drillbits is changed.
  */
 public interface DrillbitStatusListener {
-
+  // TODO this doesn't belong here
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class);
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8e0780b..9650ee5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -19,27 +19,27 @@ package org.apache.drill.exec.work.foreman;
 
 import io.netty.buffer.ByteBuf;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.QueryDateTimeInfo;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
@@ -64,10 +64,11 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.testing.ExceptionInjector;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.ErrorHelper;
@@ -76,117 +77,120 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
+import org.codehaus.jackson.map.ObjectMapper;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 /**
- * Foreman manages all queries where this is the driving/root node.
+ * Foreman manages all the fragments (local and remote) for a single query where this
+ * is the driving/root node.
  *
  * The flow is as follows:
- *   - Foreman is submitted as a runnable.
- *   - Runnable does query planning.
- *   - PENDING > RUNNING
- *   - Runnable sends out starting fragments
- *   - Status listener are activated
- *   - Foreman listens for state move messages.
- *
+ * - Foreman is submitted as a runnable.
+ * - Runnable does query planning.
+ * - state changes from PENDING to RUNNING
+ * - Runnable sends out starting fragments
+ * - Status listener are activated
+ * - The Runnable's run() completes, but the Foreman stays around
+ * - Foreman listens for state change messages.
+ * - state change messages can drive the state to FAILED or CANCELED, in which case
+ *   messages are sent to running fragments to terminate
+ * - when all fragments complete, state change messages drive the state to COMPLETED
  */
-public class Foreman implements Runnable, Closeable, Comparable<Object> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-
-  private QueryId queryId;
-  private RunQuery queryRequest;
-  private QueryContext context;
-  private QueryManager queryManager;
-  private WorkerBee bee;
-  private UserClientConnection initiatingClient;
+public class Foreman implements Runnable {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
+  private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+
+  private final QueryId queryId;
+  private final RunQuery queryRequest;
+  private final QueryContext queryContext;
+  private final QueryManager queryManager; // handles lower-level details of query execution
+  private final WorkerBee bee; // provides an interface to submit tasks
+  private final DrillbitContext drillbitContext;
+  private final UserClientConnection initiatingClient; // used to send responses
   private volatile QueryState state;
 
-  private final DistributedSemaphore smallSemaphore;
-  private final DistributedSemaphore largeSemaphore;
-  private final long queueThreshold;
-  private final long queueTimeout;
-  private volatile DistributedLease lease;
-  private final boolean queuingEnabled;
+  private volatile DistributedLease lease; // used to limit the number of concurrent queries
+
+  private FragmentExecutor rootRunner; // root Fragment
 
-  private FragmentExecutor rootRunner;
-  private final CountDownLatch acceptExternalEvents = new CountDownLatch(1);
-  private final StateListener stateListener = new StateListener();
+  private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
+  private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
+  private final ForemanResult foremanResult = new ForemanResult();
 
-  public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId,
-      RunQuery queryRequest) {
+  /**
+   * Constructor. Sets up the Foreman, but does not initiate any execution.
+   *
+   * @param bee used to submit additional work
+   * @param drillbitContext
+   * @param connection
+   * @param queryId the id for the query
+   * @param queryRequest the query to execute
+   */
+  public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
+      final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
+    this.bee = bee;
     this.queryId = queryId;
     this.queryRequest = queryRequest;
-    this.context = new QueryContext(connection.getSession(), queryId, dContext);
+    this.drillbitContext = drillbitContext;
 
-    // set up queuing
-    this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
-    if (queuingEnabled) {
-      int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
-      int largeQueue = context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
-      this.largeSemaphore = dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue);
-      this.smallSemaphore = dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue);
-      this.queueThreshold = context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
-      this.queueTimeout = context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
-    } else {
-      this.largeSemaphore = null;
-      this.smallSemaphore = null;
-      this.queueThreshold = 0;
-      this.queueTimeout = 0;
-    }
-    // end queuing setup.
-
-    this.initiatingClient = connection;
-    this.queryManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(),
-        stateListener, this);
-    this.bee = bee;
+    initiatingClient = connection;
+    queryContext = new QueryContext(connection.getSession(), drillbitContext);
+    queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
+        stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
 
     recordNewState(QueryState.PENDING);
   }
 
-  public QueryContext getContext() {
-    return context;
+  /**
+   * Get the QueryContext created for the query.
+   *
+   * @return the QueryContext
+   */
+  public QueryContext getQueryContext() {
+    return queryContext;
   }
 
-  public void cancel() {
-    stateListener.moveToState(QueryState.CANCELED, null);
+  /**
+   * Get the QueryManager created for the query.
+   *
+   * @return the QueryManager
+   */
+  public QueryManager getQueryManager() {
+    return queryManager;
   }
 
-  private void cleanup(QueryResult result) {
-    logger.info("foreman cleaning up - status: {}", queryManager.getStatus());
-
-    bee.retireForeman(this);
-    context.getWorkBus().removeFragmentStatusListener(queryId);
-    context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
-
-    try {
-      try {
-        context.close();
-      } catch (Exception e) {
-        moveToState(QueryState.FAILED, e);
-        return;
-      }
-
-      if (result != null) {
-        initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
-      }
-    } finally {
-      releaseLease();
-    }
+  /**
+   * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be
+   * terminated.
+   */
+  public void cancel() {
+    // Note this can be called from outside of run() on another thread, or after run() completes
+    stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null);
   }
 
   /**
-   * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
+   * Called by execution pool to do query setup, and kick off remote execution.
+   *
+   * <p>Note that completion of this function is not the end of the Foreman's role
+   * in the query's lifecycle.
    */
+  @Override
   public void run() {
+    // rename the thread we're using for debugging purposes
+    final Thread currentThread = Thread.currentThread();
+    final String originalName = currentThread.getName();
+    currentThread.setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
+
+    // track how long the query takes
+    queryManager.markStartTime();
 
-    final String originalThread = Thread.currentThread().getName();
-    Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
-    getStatus().markStart();
-    // convert a run query request into action
     try {
+      injector.injectChecked(drillbitContext, "run-try-beginning", ForemanException.class);
+
+      // convert a run query request into action
       switch (queryRequest.getType()) {
       case LOGICAL:
         parseAndRunLogicalPlan(queryRequest.getPlan());
@@ -200,39 +204,71 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
       default:
         throw new IllegalStateException();
       }
+      injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
     } catch (ForemanException e) {
       moveToState(QueryState.FAILED, e);
-
     } catch (AssertionError | Exception ex) {
-      moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
-
+      moveToState(QueryState.FAILED,
+          new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
     } catch (OutOfMemoryError e) {
+      /*
+       * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
+       * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
+       * attempt to notify them, which might not work under these conditions.
+       */
+      /*
+       * TODO this will kill everything in this JVM; why can't we just free all allocation
+       * associated with this Foreman and allow others to continue?
+       */
       System.out.println("Out of memory, exiting.");
       e.printStackTrace();
       System.out.flush();
       System.exit(-1);
-
     } finally {
-      Thread.currentThread().setName(originalThread);
+      /*
+       * Begin accepting external events.
+       *
+       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
+       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
+       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
+       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
+       * event delivery call.
+       *
+       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
+       * make sure that we can't make things any worse as those events are delivered, but allow
+       * any necessary remaining cleanup to proceed.
+       */
+      acceptExternalEvents.countDown();
+
+      // restore the thread's original name
+      currentThread.setName(originalName);
     }
+
+    /*
+     * Note that despite the run() completing, the Foreman continues to exist, and receives
+     * events (indirectly, through the QueryManager's use of stateListener), about fragment
+     * completions. It won't go away until everything is completed, failed, or cancelled.
+     */
   }
 
   private void releaseLease() {
-    if (lease != null) {
+    while (lease != null) {
       try {
         lease.close();
+        lease = null;
+      } catch (InterruptedException e) {
+        // if we end up here, the while loop will try again
       } catch (Exception e) {
         logger.warn("Failure while releasing lease.", e);
+        break;
       }
-      ;
     }
-
   }
 
-  private void parseAndRunLogicalPlan(String json) throws ExecutionSetupException {
+  private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
     LogicalPlan logicalPlan;
     try {
-      logicalPlan = context.getPlanReader().readLogicalPlan(json);
+      logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
     } catch (IOException e) {
       throw new ForemanException("Failure parsing logical plan.", e);
     }
@@ -244,7 +280,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
 
     log(logicalPlan);
 
-    PhysicalPlan physicalPlan = convert(logicalPlan);
+    final PhysicalPlan physicalPlan = convert(logicalPlan);
 
     if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
       returnPhysical(physicalPlan);
@@ -252,20 +288,19 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
     }
 
     log(physicalPlan);
-
     runPhysicalPlan(physicalPlan);
   }
 
-  private void log(LogicalPlan plan) {
+  private void log(final LogicalPlan plan) {
     if (logger.isDebugEnabled()) {
-      logger.debug("Logical {}", plan.unparse(context.getConfig()));
+      logger.debug("Logical {}", plan.unparse(queryContext.getConfig()));
     }
   }
 
-  private void log(PhysicalPlan plan) {
+  private void log(final PhysicalPlan plan) {
     if (logger.isDebugEnabled()) {
       try {
-        String planText = context.getConfig().getMapper().writeValueAsString(plan);
+        String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
         logger.debug("Physical {}", planText);
       } catch (IOException e) {
         logger.warn("Error while attempting to log physical plan.", e);
@@ -273,60 +308,54 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
     }
   }
 
-  private void returnPhysical(PhysicalPlan plan) throws ExecutionSetupException {
-    String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
-    runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan)));
+  private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException {
+    final String jsonPlan = plan.unparse(queryContext.getConfig().getMapper().writer());
+    runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan)));
   }
 
   public static class PhysicalFromLogicalExplain {
-    public String json;
+    public final String json;
 
-    public PhysicalFromLogicalExplain(String json) {
-      super();
+    public PhysicalFromLogicalExplain(final String json) {
       this.json = json;
     }
-
   }
 
-  private void parseAndRunPhysicalPlan(String json) throws ExecutionSetupException {
+  private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException {
     try {
-      PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
+      final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
       runPhysicalPlan(plan);
     } catch (IOException e) {
       throw new ForemanSetupException("Failure while parsing physical plan.", e);
     }
   }
 
-  private void runPhysicalPlan(PhysicalPlan plan) throws ExecutionSetupException {
-
+  private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
     validatePlan(plan);
     setupSortMemoryAllocations(plan);
     acquireQuerySemaphore(plan);
 
     final QueryWorkUnit work = getQueryWorkUnit(plan);
-
-    this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), queryManager);
-    this.context.getClusterCoordinator().addDrillbitStatusListener(queryManager);
-
-    logger.debug("Submitting fragments to run.");
-
+    final List<PlanFragment> planFragments = work.getFragments();
     final PlanFragment rootPlanFragment = work.getRootFragment();
     assert queryId == rootPlanFragment.getHandle().getQueryId();
 
-    queryManager.setup(rootPlanFragment.getHandle(), context.getCurrentEndpoint(), work.getFragments().size());
+    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager);
+    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+
+    logger.debug("Submitting fragments to run.");
 
     // set up the root fragment first so we'll have incoming buffers available.
     setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
 
-    setupNonRootFragments(work.getFragments());
-    bee.getContext().getAllocator().resetFragmentLimits();
+    setupNonRootFragments(planFragments);
+    drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
 
     moveToState(QueryState.RUNNING, null);
     logger.debug("Fragments running.");
-
   }
 
-  private void validatePlan(PhysicalPlan plan) throws ForemanSetupException{
+  private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
     if (plan.getProperties().resultMode != ResultMode.EXEC) {
       throw new ForemanSetupException(String.format(
           "Failure running plan.  You requested a result mode of %s and a physical plan can only be output as EXEC",
@@ -334,369 +363,632 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
     }
   }
 
-  private void setupSortMemoryAllocations(PhysicalPlan plan){
-    int sortCount = 0;
+  private void setupSortMemoryAllocations(final PhysicalPlan plan) {
+    // look for external sorts
+    final List<ExternalSort> sortList = new LinkedList<>();
     for (PhysicalOperator op : plan.getSortedOperators()) {
       if (op instanceof ExternalSort) {
-        sortCount++;
+        sortList.add((ExternalSort) op);
       }
     }
 
-    if (sortCount > 0) {
-      long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
+    // if there are any sorts, compute the maximum allocation, and set it on them
+    if (sortList.size() > 0) {
+      final OptionManager optionManager = queryContext.getOptions();
+      final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
       long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
-          context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
+          queryContext.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
       maxAllocPerNode = Math.min(maxAllocPerNode,
-          context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
-      long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode);
+          optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
+      final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
       logger.debug("Max sort alloc: {}", maxSortAlloc);
-      for (PhysicalOperator op : plan.getSortedOperators()) {
-        if (op instanceof ExternalSort) {
-          ((ExternalSort) op).setMaxAllocation(maxSortAlloc);
-        }
+
+      for(ExternalSort externalSort : sortList) {
+        externalSort.setMaxAllocation(maxSortAlloc);
       }
     }
   }
 
-  private void acquireQuerySemaphore(PhysicalPlan plan) throws ForemanSetupException {
-
-    double size = 0;
-    for (PhysicalOperator ops : plan.getSortedOperators()) {
-      size += ops.getCost();
-    }
-
+  /**
+   * This limits the number of "small" and "large" queries that a Drill cluster will run
+   * simultaneously, if queueing is enabled. If the query is unable to run, this will block
+   * until it can. Beware that this is called under run(), and so will consume a Thread
+   * while it waits for the required distributed semaphore.
+   *
+   * @param plan the query plan
+   * @throws ForemanSetupException
+   */
+  private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
+    final OptionManager optionManager = queryContext.getOptions();
+    final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
     if (queuingEnabled) {
+      final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
+      double totalCost = 0;
+      for (PhysicalOperator ops : plan.getSortedOperators()) {
+        totalCost += ops.getCost();
+      }
+
       try {
-        if (size > this.queueThreshold) {
-          this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
+        @SuppressWarnings("resource")
+        final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
+        DistributedSemaphore distributedSemaphore;
+
+        // get the appropriate semaphore
+        if (totalCost > queueThreshold) {
+          final int largeQueue = optionManager.getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
+          distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
         } else {
-          this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
+          final int smallQueue = optionManager.getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
+          distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
         }
+
+        final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
+        lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
       } catch (Exception e) {
         throw new ForemanSetupException("Unable to acquire slot for query.", e);
       }
     }
   }
 
-  private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException {
-    PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
-    Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+  private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
+    final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+    final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+    final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
+    final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(
+        queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+        queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
+        initiatingClient.getSession(), queryContext.getQueryDateTimeInfo());
+
+    if (logger.isInfoEnabled()) {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("PlanFragments for query ");
+      sb.append(queryId);
+      sb.append('\n');
+
+      final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
+      final int fragmentCount = planFragments.size();
+      int fragmentIndex = 0;
+      for(PlanFragment planFragment : planFragments) {
+        final FragmentHandle fragmentHandle = planFragment.getHandle();
+        sb.append("PlanFragment(");
+        sb.append(++fragmentIndex);
+        sb.append('/');
+        sb.append(fragmentCount);
+        sb.append(") major_fragment_id ");
+        sb.append(fragmentHandle.getMajorFragmentId());
+        sb.append(" minor_fragment_id ");
+        sb.append(fragmentHandle.getMinorFragmentId());
+        sb.append('\n');
+
+        final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+        sb.append("  DrillbitEndpoint address ");
+        sb.append(endpointAssignment.getAddress());
+        sb.append('\n');
+
+        String jsonString = "<<malformed JSON>>";
+        sb.append("  fragment_json: ");
+        final ObjectMapper objectMapper = new ObjectMapper();
+        try
+        {
+          final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+          jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
+        } catch(Exception e) {
+          // we've already set jsonString to a fallback value
+        }
+        sb.append(jsonString);
 
-    SimpleParallelizer parallelizer = new SimpleParallelizer(context);
-    return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
-        queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession(),
-        context.getQueryDateTimeInfo());
+        logger.info(sb.toString());
+      }
+    }
+
+    return queryWorkUnit;
   }
 
   /**
-   * Tells the foreman to move to a new state.  Note that
-   * @param state
-   * @return
+   * Manages the end-state processing for Foreman.
+   *
+   * End-state processing is tricky, because even if a query appears to succeed, but
+   * we then encounter a problem during cleanup, we still want to mark the query as
+   * failed. So we have to construct the successful result we would send, and then
+   * clean up before we send that result, possibly changing that result if we encounter
+   * a problem during cleanup. We only send the result when there is nothing left to
+   * do, so it will account for any possible problems.
+   *
+   * The idea here is to make close()ing the ForemanResult do the final cleanup and
+   * sending. Closing the result must be the last thing that is done by Foreman.
    */
-  private synchronized boolean moveToState(QueryState newState, Exception exception){
-    logger.info("State change requested.  {} --> {}", state, newState, exception);
-    outside: switch(state) {
+  private class ForemanResult implements AutoCloseable {
+    private QueryState resultState = null;
+    private Exception resultException = null;
+    private boolean isClosed = false;
+
+    /**
+     * Set up the result for a COMPLETED or CANCELED state.
+     *
+     * <p>Note that before sending this result, we execute cleanup steps that could
+     * result in this result still being changed to a FAILED state.
+     *
+     * @param queryState one of COMPLETED or CANCELED
+     */
+    public void setCompleted(final QueryState queryState) {
+      Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED));
+      Preconditions.checkState(!isClosed);
+      Preconditions.checkState(resultState == null);
+
+      resultState = queryState;
+    }
+
+    /**
+     * Set up the result for a FAILED state.
+     *
+     * <p>Failures that occur during cleanup processing will be added as suppressed
+     * exceptions.
+     *
+     * @param exception the exception that led to the FAILED state
+     */
+    public void setFailed(final Exception exception) {
+      Preconditions.checkArgument(exception != null);
+      Preconditions.checkState(!isClosed);
+      Preconditions.checkState(resultState == null);
+
+      resultState = QueryState.FAILED;
+      resultException = exception;
+    }
+
+    /**
+     * Add an exception to the result. All exceptions after the first become suppressed
+     * exceptions hanging off the first.
+     *
+     * @param exception the exception to add
+     */
+    private void addException(final Exception exception) {
+      Preconditions.checkNotNull(exception);
+
+      if (resultException == null) {
+        resultException = exception;
+      } else {
+        resultException.addSuppressed(exception);
+      }
+    }
 
-    case PENDING:
-      // since we're moving out of pending, we can now start accepting other changes in state.
-      // This guarantees that the first state change is driven by the original thread.
-      acceptExternalEvents.countDown();
+    /**
+     * Close the given resource, catching and adding any caught exceptions via
+     * {@link #addException(Exception)}. If an exception is caught, it will change
+     * the result state to FAILED, regardless of what its current value.
+     *
+     * @param autoCloseable the resource to close
+     */
+    private void suppressingClose(final AutoCloseable autoCloseable) {
+      Preconditions.checkState(!isClosed);
+      Preconditions.checkState(resultState != null);
 
-      if(newState == QueryState.RUNNING){
-        recordNewState(QueryState.RUNNING);
-        return true;
+      if (autoCloseable == null) {
+        return;
       }
 
-      // fall through to running behavior.
-      //
-    case RUNNING: {
+      try {
+        autoCloseable.close();
+      } catch(Exception e) {
+        /*
+         * Even if the query completed successfully, we'll still report failure if we have
+         * problems cleaning up.
+         */
+        resultState = QueryState.FAILED;
+        addException(e);
+      }
+    }
+
+    @Override
+    public void close() {
+      Preconditions.checkState(!isClosed);
+      Preconditions.checkState(resultState != null);
+
+      logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString());
+
+      // These are straight forward removals from maps, so they won't throw.
+      drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
+      drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+
+      suppressingClose(queryContext);
+
+      /*
+       * We do our best to write the latest state, but even that could fail. If it does, we can't write
+       * the (possibly newly failing) state, so we continue on anyway.
+       *
+       * We only need to do this if the resultState differs from the last recorded state
+       */
+      if (resultState != state) {
+        suppressingClose(new AutoCloseable() {
+          @Override
+          public void close() throws Exception {
+            recordNewState(resultState);
+          }
+        });
+      }
+
+      /*
+       * Construct the response based on the latest resultState. The builder shouldn't fail.
+       */
+      final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
+          .setIsLastChunk(resultState != QueryState.COMPLETED) // TODO(DRILL-2498) temporary
+          .setQueryId(queryId)
+          .setQueryState(resultState);
+      if (resultException != null) {
+        final DrillPBError error = ErrorHelper.logAndConvertError(queryContext.getCurrentEndpoint(),
+            ExceptionUtils.getRootCauseMessage(resultException), resultException, logger);
+        resultBuilder.addError(error);
+      }
+
+      /*
+       * If sending the result fails, we don't really have any way to modify the result we tried to send;
+       * it is possible it got sent but the result came from a later part of the code path. It is also
+       * possible the connection has gone away, so this is irrelevant because there's nowhere to
+       * send anything to.
+       */
+      try {
+        // send whatever result we ended up with
+        initiatingClient.sendResult(responseListener, new QueryWritableBatch(resultBuilder.build()), true);
+      } catch(Exception e) {
+        addException(e);
+        logger.warn("Exception sending result to client", resultException);
+      }
+
+      try {
+        releaseLease();
+      } finally {
+        isClosed = true;
+      }
+    }
+  }
+
+  /**
+   * Tells the foreman to move to a new state.
+   *
+   * @param newState the state to move to
+   * @param exception if not null, the exception that drove this state transition (usually a failure)
+   */
+  private synchronized void moveToState(final QueryState newState, final Exception exception) {
+    logger.info("State change requested.  {} --> {}", state, newState, exception);
+    switch(state) {
+    case PENDING:
+      if (newState == QueryState.RUNNING) {
+        recordNewState(QueryState.RUNNING);
+        return;
+      }
 
-      switch(newState){
+      //$FALL-THROUGH$
 
-      case CANCELED: {
+    case RUNNING: {
+      /*
+       * For cases that cancel executing fragments, we have to record the new state first, because
+       * the cancellation of the local root fragment will cause this to be called recursively.
+       */
+      switch(newState) {
+      case CANCELLATION_REQUESTED: {
         assert exception == null;
-        recordNewState(QueryState.CANCELED);
-        cancelExecutingFragments();
-        QueryResult result = QueryResult.newBuilder() //
-            .setQueryId(queryId) //
-            .setQueryState(QueryState.CANCELED) //
-            .setIsLastChunk(true) //
-            .build();
-
-        // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to
-        // a terminal state(completed, failed)
-        initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
-        return true;
+        queryManager.markEndTime();
+        recordNewState(QueryState.CANCELLATION_REQUESTED);
+        queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+        foremanResult.setCompleted(QueryState.CANCELED);
+        /*
+         * We don't close the foremanResult until we've gotten acknowledgements, which
+         * happens below in the case for current state == CANCELLATION_REQUESTED.
+         */
+        return;
       }
 
       case COMPLETED: {
         assert exception == null;
+        queryManager.markEndTime();
         recordNewState(QueryState.COMPLETED);
-        QueryResult result = QueryResult //
-            .newBuilder() //
-            .setQueryState(QueryState.COMPLETED) //
-            .setQueryId(queryId) //
-            .build();
-        cleanup(result);
-        return true;
+        foremanResult.setCompleted(QueryState.COMPLETED);
+        foremanResult.close();
+        return;
       }
 
-
-      case FAILED:
+      case FAILED: {
         assert exception != null;
+        queryManager.markEndTime();
         recordNewState(QueryState.FAILED);
-        cancelExecutingFragments();
-        DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(),
-            ExceptionUtils.getRootCauseMessage(exception), exception, logger);
-        QueryResult result = QueryResult //
-          .newBuilder() //
-          .addError(error) //
-          .setIsLastChunk(true) //
-          .setQueryState(QueryState.FAILED) //
-          .setQueryId(queryId) //
-          .build();
-        cleanup(result);
-        return true;
-      default:
-        break outside;
+        queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+        foremanResult.setFailed(exception);
+        foremanResult.close();
+        return;
+      }
 
+      default:
+        throw new IllegalStateException("illegal transition from RUNNING to " + newState);
       }
     }
 
+    case CANCELLATION_REQUESTED:
+      if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
+          || (newState == QueryState.FAILED)) {
+        /*
+         * These amount to a completion of the cancellation requests' cleanup; now we
+         * can clean up and send the result.
+         */
+        foremanResult.close();
+      }
+      return;
+
     case CANCELED:
     case COMPLETED:
-    case FAILED: {
-      // no op.
-      logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
-      return false;
-    }
-
-    }
-
-    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
-  }
-
-  private void cancelExecutingFragments(){
-
-    // Stop all framgents with a currently active status.
-    List<FragmentData> fragments = getStatus().getFragmentData();
-    Collections.sort(fragments, new Comparator<FragmentData>() {
-      @Override
-      public int compare(FragmentData o1, FragmentData o2) {
-        return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
-      }
-    });
-    for(FragmentData data: fragments){
-      FragmentHandle handle = data.getStatus().getHandle();
-      switch(data.getStatus().getProfile().getState()){
-      case SENDING:
-      case AWAITING_ALLOCATION:
-      case RUNNING:
-        if(data.isLocal()){
-          if(rootRunner != null){
-            rootRunner.cancel();
-          }
-        }else{
-          bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
-        }
-        break;
-      default:
-        break;
-      }
+    case FAILED:
+      logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
+          newState, state);
+      return;
     }
 
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
+        state.name(), newState.name()));
   }
 
-  private QueryStatus getStatus(){
-    return queryManager.getStatus();
-  }
-
-  private void recordNewState(QueryState newState){
-    this.state = newState;
-    getStatus().updateQueryStateInStore(newState);
+  private void recordNewState(final QueryState newState) {
+    state = newState;
+    queryManager.updateQueryStateInStore(newState);
   }
 
-  private void runSQL(String sql) throws ExecutionSetupException {
-    DrillSqlWorker sqlWorker = new DrillSqlWorker(context);
-    Pointer<String> textPlan = new Pointer<>();
-    PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
-    getStatus().setPlanText(textPlan.value);
+  private void runSQL(final String sql) throws ExecutionSetupException {
+    final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext);
+    final Pointer<String> textPlan = new Pointer<>();
+    final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
+    queryManager.setPlanText(textPlan.value);
     runPhysicalPlan(plan);
   }
 
-  private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
+  private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
     if (logger.isDebugEnabled()) {
-      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
+      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig()));
     }
-    return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(
-        new BasicOptimizer.BasicOptimizationContext(context), plan);
+    return new BasicOptimizer(queryContext).optimize(
+        new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
   }
 
   public QueryId getQueryId() {
     return queryId;
   }
 
-  @Override
-  public void close() throws IOException {
-  }
-
-  public QueryStatus getQueryStatus() {
-    return this.queryManager.getStatus();
-  }
-
-  private void setupRootFragment(PlanFragment rootFragment, UserClientConnection rootClient, FragmentRoot rootOperator) throws ExecutionSetupException {
-    FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext()
-        .getFunctionImplementationRegistry());
-
-    IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
-
+  /**
+   * Set up the root fragment (which will run locally), and submit it for execution.
+   *
+   * @param rootFragment
+   * @param rootClient
+   * @param rootOperator
+   * @throws ExecutionSetupException
+   */
+  private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
+      final FragmentRoot rootOperator) throws ExecutionSetupException {
+    @SuppressWarnings("resource")
+    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
+        drillbitContext.getFunctionImplementationRegistry());
+    @SuppressWarnings("resource")
+    final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
     rootContext.setBuffers(buffers);
 
-    // add fragment to local node.
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    this.rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, queryManager.getRootStatusHandler(rootContext, rootFragment));
-    RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+    rootRunner = new FragmentExecutor(rootContext, rootOperator,
+        queryManager.getRootStatusHandler(rootContext));
+    final RootFragmentManager fragmentManager =
+        new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 
     if (buffers.isDone()) {
       // if we don't have to wait for any incoming data, start the fragment runner.
       bee.addFragmentRunner(fragmentManager.getRunnable());
     } else {
       // if we do, record the fragment manager in the workBus.
-      bee.getContext().getWorkBus().setFragmentManager(fragmentManager);
+      // TODO aren't we managing our own work? What does this do? It looks like this will never get run
+      drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
     }
   }
 
-  private void setupNonRootFragments(Collection<PlanFragment> fragments) throws ForemanException{
-    Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
-    Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
+  /**
+   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
+   * Messages are sent immediately, so they may start returning data even before we complete this.
+   *
+   * @param fragments the fragments
+   * @throws ForemanException
+   */
+  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
+    /*
+     * We will send a single message to each endpoint, regardless of how many fragments will be
+     * executed there. We need to start up the intermediate fragments first so that they will be
+     * ready once the leaf fragments start producing data. To satisfy both of these, we will
+     * make a pass through the fragments and put them into these two maps according to their
+     * leaf/intermediate state, as well as their target drillbit.
+     */
+    final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
+    final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
 
     // record all fragments for status purposes.
-    for (PlanFragment f : fragments) {
+    for (PlanFragment planFragment : fragments) {
 //      logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
-      queryManager.addFragmentStatusTracker(f, false);
-      if (f.getLeafFragment()) {
-        leafFragmentMap.put(f.getAssignment(), f);
+      queryManager.addFragmentStatusTracker(planFragment, false);
+      if (planFragment.getLeafFragment()) {
+        leafFragmentMap.put(planFragment.getAssignment(), planFragment);
       } else {
-        intFragmentMap.put(f.getAssignment(), f);
+        intFragmentMap.put(planFragment.getAssignment(), planFragment);
       }
     }
 
-    CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size());
+    /*
+     * We need to wait for the intermediates to be sent so that they'll be set up by the time
+     * the leaves start producing data. We'll use this latch to wait for the responses.
+     *
+     * However, in order not to hang the process if any of the RPC requests fails, we always
+     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
+     * know if any submissions did fail.
+     */
+    final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size());
+    final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
 
     // send remote intermediate fragments
     for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
-      sendRemoteFragments(ep, intFragmentMap.get(ep), latch);
+      sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
     }
 
-    // wait for send complete
-    try {
-      latch.await();
-    } catch (InterruptedException e) {
-      throw new ForemanException("Interrupted while waiting to complete send of remote fragments.", e);
+    // wait for the status of all requests sent above to be known
+    boolean ready = false;
+    while(!ready) {
+      try {
+        endpointLatch.await();
+        ready = true;
+      } catch (InterruptedException e) {
+        // if we weren't ready, the while loop will continue to wait
+      }
     }
 
-    // send remote (leaf) fragments.
-    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, leafFragmentMap.get(ep), null);
+    // if any of the intermediate fragment submissions failed, fail the query
+    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
+        fragmentSubmitFailures.submissionExceptions;
+    if (submissionExceptions.size() > 0) {
+      throw new ForemanSetupException("Error setting up remote intermediate fragment execution",
+          submissionExceptions.get(0).rpcException);
+      // TODO indicate the failing drillbit?
+      // TODO report on all the failures?
     }
-  }
 
-  public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){
-    return new FragmentSubmitListener(endpoint, value, latch);
+    /*
+     * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
+     * the regular sendListener event delivery.
+     */
+    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
+    }
   }
 
-  private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){
-    Controller controller = bee.getContext().getController();
-    InitializeFragments.Builder fb = InitializeFragments.newBuilder();
-    for(PlanFragment f : fragments){
-      fb.addFragment(f);
+  /**
+   * Send all the remote fragments belonging to a single target drillbit in one request.
+   *
+   * @param assignment the drillbit assigned to these fragments
+   * @param fragments the set of fragments
+   * @param latch the countdown latch used to track the requests to all endpoints
+   * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
+   */
+  private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
+      final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+    @SuppressWarnings("resource")
+    final Controller controller = drillbitContext.getController();
+    final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
+    for(PlanFragment planFragment : fragments) {
+      fb.addFragment(planFragment);
     }
-    InitializeFragments initFrags = fb.build();
+    final InitializeFragments initFrags = fb.build();
 
     logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
-    FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch);
+    final FragmentSubmitListener listener =
+        new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
     controller.getTunnel(assignment).sendFragments(listener, initFrags);
   }
 
-  public QueryState getState(){
+  public QueryState getState() {
     return state;
   }
 
-  private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{
+  /**
+   * Used by {@link FragmentSubmitListener} to track the number of submission failures.
+   */
+  private static class FragmentSubmitFailures {
+    static class SubmissionException {
+//      final DrillbitEndpoint drillbitEndpoint;
+      final RpcException rpcException;
+
+      SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint,
+          final RpcException rpcException) {
+//        this.drillbitEndpoint = drillbitEndpoint;
+        this.rpcException = rpcException;
+      }
+    }
+
+    final List<SubmissionException> submissionExceptions = new LinkedList<>();
+
+    void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
+      submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
+    }
+  }
 
-    private CountDownLatch latch;
+  private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments> {
+    private final CountDownLatch latch;
+    private final FragmentSubmitFailures fragmentSubmitFailures;
 
-    public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) {
+    /**
+     * Constructor.
+     *
+     * @param endpoint the endpoint for the submission
+     * @param value the initialize fragments message
+     * @param latch the latch to count down when the status is known; may be null
+     * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
+     */
+    public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
+        final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
       super(endpoint, value);
+      Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
       this.latch = latch;
+      this.fragmentSubmitFailures = fragmentSubmitFailures;
     }
 
     @Override
-    public void success(Ack ack, ByteBuf byteBuf) {
+    public void success(final Ack ack, final ByteBuf byteBuf) {
       if (latch != null) {
         latch.countDown();
       }
     }
 
     @Override
-    public void failed(RpcException ex) {
-      logger.debug("Failure while sending fragment.  Stopping query.", ex);
-      moveToState(QueryState.FAILED, ex);
+    public void failed(final RpcException ex) {
+      if (latch != null) {
+        fragmentSubmitFailures.addFailure(endpoint, ex);
+        latch.countDown();
+      } else {
+        // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
+        logger.debug("Failure while sending fragment.  Stopping query.", ex);
+        stateListener.moveToState(QueryState.FAILED, ex);
+      }
     }
-
   }
 
-
+  /**
+   * Provides gated access to state transitions.
+   *
+   * <p>The StateListener waits on a latch before delivery state transitions to the Foreman. The
+   * latch will be tripped when the Foreman is sufficiently set up that it can receive and process
+   * external events from other threads.
+   */
   public class StateListener {
-    public boolean moveToState(QueryState newState, Exception ex){
-      try {
-        acceptExternalEvents.await();
-      } catch(InterruptedException e){
-        logger.warn("Interrupted while waiting to move state.", e);
-        return false;
+    /**
+     * Move the Foreman to the specified new state.
+     *
+     * @param newState the state to move to
+     * @param ex if moving to a failure state, the exception that led to the failure; used for reporting
+     *   to the user
+     */
+    public void moveToState(final QueryState newState, final Exception ex) {
+      boolean ready = false;
+      while(!ready) {
+        try {
+          acceptExternalEvents.await();
+          ready = true;
+        } catch(InterruptedException e) {
+          // if we're still not ready, the while loop will cause us to wait again
+          logger.warn("Interrupted while waiting to move state.", e);
+        }
       }
 
-      return Foreman.this.moveToState(newState, ex);
+      Foreman.this.moveToState(newState, ex);
     }
   }
 
-
-  @Override
-  public int compareTo(Object o) {
-    return hashCode() - o.hashCode();
-  }
-
+  /**
+   * Listens for the status of the RPC response sent to the user for the query.
+   */
   private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
     @Override
-    public void failed(RpcException ex) {
-      logger
-          .info(
-              "Failure while trying communicate query result to initating client.  This would happen if a client is disconnected before response notice can be sent.",
-              ex);
-      moveToState(QueryState.FAILED, ex);
+    public void failed(final RpcException ex) {
+      logger.info(
+          "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
+          ex);
+      stateListener.moveToState(QueryState.FAILED, ex);
     }
   }
-
-
-  private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
-
-    public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
-      super(endpoint, handle);
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
-    }
-
-    @Override
-    public void success(Ack value, ByteBuf buf) {
-      if(!value.getOk()){
-        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
-      }
-      // do nothing.
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 52fd0a9..433ab26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -29,19 +29,21 @@ public class FragmentData {
   private volatile long lastStatusUpdate = 0;
   private final DrillbitEndpoint endpoint;
 
-  public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
-    super();
-    MinorFragmentProfile f = MinorFragmentProfile.newBuilder() //
-        .setState(FragmentState.SENDING) //
-        .setMinorFragmentId(handle.getMinorFragmentId()) //
-        .setEndpoint(endpoint) //
-        .build();
-    this.status = FragmentStatus.newBuilder().setHandle(handle).setProfile(f).build();
+  public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) {
     this.endpoint = endpoint;
     this.isLocal = isLocal;
+    final MinorFragmentProfile f = MinorFragmentProfile.newBuilder()
+        .setState(FragmentState.SENDING)
+        .setMinorFragmentId(handle.getMinorFragmentId())
+        .setEndpoint(endpoint)
+        .build();
+    status = FragmentStatus.newBuilder()
+        .setHandle(handle)
+        .setProfile(f)
+        .build();
   }
 
-  public void setStatus(FragmentStatus status){
+  public void setStatus(final FragmentStatus status) {
     this.status = status;
     lastStatusUpdate = System.currentTimeMillis();
   }
@@ -54,15 +56,11 @@ public class FragmentData {
     return isLocal;
   }
 
-  public long getLastStatusUpdate() {
-    return lastStatusUpdate;
-  }
-
   public DrillbitEndpoint getEndpoint() {
     return endpoint;
   }
 
-  public FragmentHandle getHandle(){
+  public FragmentHandle getHandle() {
     return status.getHandle();
   }
 
@@ -71,7 +69,4 @@ public class FragmentData {
     return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate
         + ", endpoint=" + endpoint + "]";
   }
-
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
index 6a719d2..b2a40ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
@@ -20,7 +20,5 @@ package org.apache.drill.exec.work.foreman;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 
 public interface FragmentStatusListener {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusListener.class);
-
   public void statusUpdate(FragmentStatus status);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2da618cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 2de3592..8626d5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -17,144 +17,353 @@
  */
 package org.apache.drill.exec.work.foreman;
 
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RemoteRpcException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
 
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-
 /**
  * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
  */
-public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
-  private final Set<DrillbitEndpoint> includedBits;
+public class QueryManager implements FragmentStatusListener, DrillbitStatusListener {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+
+  public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
+          newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE)
+      .name("profiles")
+      .blob()
+      .max(100)
+      .build();
 
-  private final QueryStatus status;
+  public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig.
+          newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
+      .name("running")
+      .ephemeral()
+      .build();
+
+  private final Set<DrillbitEndpoint> includedBits;
   private final StateListener stateListener;
-  private final AtomicInteger remainingFragmentCount;
   private final QueryId queryId;
+  private final String stringQueryId;
+  private final RunQuery runQuery;
+  private final Foreman foreman;
+
+  /*
+   * Doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then
+   * accessed by multiple threads for reads only.
+   */
+  private final IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap =
+      new IntObjectOpenHashMap<>();
+  private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
+
+  private final PStore<QueryProfile> profilePStore;
+  private final PStore<QueryInfo> profileEStore;
 
-  public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
+  // the following mutable variables are used to capture ongoing query status
+  private String planText;
+  private long startTime;
+  private long endTime;
+  private final AtomicInteger finishedFragments = new AtomicInteger(0);
+
+  public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
+      final StateListener stateListener, final Foreman foreman) {
+    this.queryId =  queryId;
+    this.runQuery = runQuery;
     this.stateListener = stateListener;
-    this.queryId =  id;
-    this.remainingFragmentCount = new AtomicInteger(0);
-    this.status = new QueryStatus(query, id, pStoreProvider, foreman);
-    this.includedBits = Sets.newHashSet();
-  }
+    this.foreman = foreman;
+
+    stringQueryId = QueryIdHelper.getQueryId(queryId);
+    try {
+      profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
+      profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
 
-  public QueryStatus getStatus(){
-    return status;
+    includedBits = Sets.newHashSet();
   }
 
   @Override
-  public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
+  public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
   }
 
   @Override
-  public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) {
-    for(DrillbitEndpoint ep : unregisteredDrillbits){
-      if(this.includedBits.contains(ep)){
-        logger.warn("Drillbit {} no longer registered in cluster.  Canceling query {}", ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
-        this.stateListener.moveToState(QueryState.FAILED, new ForemanException("One more more nodes lost connectivity during query.  Identified node was " + ep.getAddress()));
+  public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
+    for(DrillbitEndpoint ep : unregisteredDrillbits) {
+      if (includedBits.contains(ep)) {
+        logger.warn("Drillbit {} no longer registered in cluster.  Canceling query {}",
+            ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
+        stateListener.moveToState(QueryState.FAILED,
+            new ForemanException("One more more nodes lost connectivity during query.  Identified node was "
+                + ep.getAddress()));
       }
     }
   }
 
-
   @Override
-  public void statusUpdate(FragmentStatus status) {
-
-    logger.debug("New fragment status was provided to Foreman of {}", status);
-    switch(status.getProfile().getState()){
+  public void statusUpdate(final FragmentStatus status) {
+    logger.debug("New fragment status was provided to QueryManager of {}", status);
+    switch(status.getProfile().getState()) {
     case AWAITING_ALLOCATION:
+    case RUNNING:
       updateFragmentStatus(status);
       break;
+
+    case FINISHED:
+      fragmentDone(status);
+      break;
+
     case CANCELLED:
-      //TODO: define a new query state to distinguish the state of early termination from cancellation
+      /*
+       * TODO
+       * This doesn't seem right; shouldn't this be similar to FAILED?
+       * and this means once all are cancelled we'll get to COMPLETED, even though some weren't?
+       *
+       * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where),
+       * but not if our cancellation listener gets it?
+       */
+      // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup
       fragmentDone(status);
       break;
+
     case FAILED:
       stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
       break;
-    case FINISHED:
-      fragmentDone(status);
-      break;
-    case RUNNING:
-      updateFragmentStatus(status);
-      break;
+
     default:
       throw new UnsupportedOperationException(String.format("Received status of %s", status));
     }
   }
 
-  private void updateFragmentStatus(FragmentStatus status){
-    this.status.updateFragmentStatus(status);
+  private void updateFragmentStatus(final FragmentStatus fragmentStatus) {
+    final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
+    final int majorFragmentId = fragmentHandle.getMajorFragmentId();
+    final int minorFragmentId = fragmentHandle.getMinorFragmentId();
+    fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
   }
 
-  private void fragmentDone(FragmentStatus status){
-    this.status.incrementFinishedFragments();
-    int remaining = remainingFragmentCount.decrementAndGet();
+  private void fragmentDone(final FragmentStatus status) {
     updateFragmentStatus(status);
+
+    final int finishedFragments = this.finishedFragments.incrementAndGet();
+    final int totalFragments = fragmentDataSet.size();
+    assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count";
+    final int remaining = totalFragments - finishedFragments;
     logger.debug("waiting for {} fragments", remaining);
-    if(remaining == 0){
+    if (remaining == 0) {
+      // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
       stateListener.moveToState(QueryState.COMPLETED, null);
     }
   }
 
-  public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
-    remainingFragmentCount.set(countOfNonRootFragments + 1);
-    logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1);
-    status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
-    this.status.setTotalFragments(countOfNonRootFragments + 1);
+  private void addFragment(final FragmentData fragmentData) {
+    final FragmentHandle fragmentHandle = fragmentData.getHandle();
+    final int majorFragmentId = fragmentHandle.getMajorFragmentId();
+    final int minorFragmentId = fragmentHandle.getMinorFragmentId();
 
-    List<FragmentData> fragments = status.getFragmentData();
-    for (FragmentData fragment : fragments) {
-      this.includedBits.add(fragment.getEndpoint());
+    IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
+    if (minorMap == null) {
+      minorMap = new IntObjectOpenHashMap<>();
+      fragmentDataMap.put(majorFragmentId, minorMap);
     }
+    minorMap.put(minorFragmentId, fragmentData);
+    fragmentDataSet.add(fragmentData);
+
+    // keep track of all the drill bits that are used by this query
+    includedBits.add(fragmentData.getEndpoint());
+  }
+
+  public String getFragmentStatesAsString() {
+    return fragmentDataMap.toString();
   }
 
-  public void addFragmentStatusTracker(PlanFragment fragment, boolean isRoot){
-    addFragmentStatusTracker(fragment.getHandle(), fragment.getAssignment(), isRoot);
+  void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
+    addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot));
   }
 
-  public void addFragmentStatusTracker(FragmentHandle handle, DrillbitEndpoint node, boolean isRoot){
-    status.add(new FragmentData(handle, node, isRoot));
+  /**
+   * Stop all fragments with a currently active status.
+   */
+  void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+    final Controller controller = drillbitContext.getController();
+    for(FragmentData data : fragmentDataSet) {
+      final FragmentStatus fragmentStatus = data.getStatus();
+      switch(fragmentStatus.getProfile().getState()) {
+      case SENDING:
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+        if (rootRunner != null) {
+            rootRunner.cancel();
+        } else {
+          final DrillbitEndpoint endpoint = data.getEndpoint();
+          final FragmentHandle handle = fragmentStatus.getHandle();
+          // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+          controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+        }
+        break;
+
+      case FINISHED:
+      case CANCELLED:
+      case FAILED:
+        // nothing to do
+        break;
+      }
+    }
   }
 
-  public RootStatusReporter getRootStatusHandler(FragmentContext context, PlanFragment fragment){
-    return new RootStatusReporter(context, fragment);
+  /*
+   * This assumes that the FragmentStatusListener implementation takes action when it hears
+   * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+   * but log messages.
+   */
+  private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
+    public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+      super(endpoint, handle);
+    }
+
+    @Override
+    public void failed(final RpcException ex) {
+      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+    }
+
+    @Override
+    public void success(final Ack value, final ByteBuf buf) {
+      if (!value.getOk()) {
+        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+      }
+    }
   }
 
-  class RootStatusReporter extends AbstractStatusReporter{
+  public RootStatusReporter getRootStatusHandler(final FragmentContext context) {
+    return new RootStatusReporter(context);
+  }
 
-    private RootStatusReporter(FragmentContext context, PlanFragment fragment){
+  class RootStatusReporter extends AbstractStatusReporter {
+    private RootStatusReporter(final FragmentContext context) {
       super(context);
     }
 
     @Override
-    protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
       statusUpdate(status);
     }
+  }
+
+  QueryState updateQueryStateInStore(final QueryState queryState) {
+    switch (queryState) {
+      case PENDING:
+      case RUNNING:
+      case CANCELLATION_REQUESTED:
+        profileEStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
+        break;
+
+      case COMPLETED:
+      case CANCELED:
+      case FAILED:
+        try {
+          profileEStore.delete(stringQueryId);
+        } catch(Exception e) {
+          logger.warn("Failure while trying to delete the estore profile for this query.", e);
+        }
+
+        // TODO(DRILL-2362) when do these ever get deleted?
+        profilePStore.put(stringQueryId, getQueryProfile());
+        break;
+
+      default:
+        throw new IllegalStateException("unrecognized queryState " + queryState);
+    }
+
+    return queryState;
+  }
+
+  private QueryInfo getQueryInfo() {
+    return QueryInfo.newBuilder()
+      .setQuery(runQuery.getPlan())
+      .setState(foreman.getState())
+      .setForeman(foreman.getQueryContext().getCurrentEndpoint())
+      .setStart(startTime)
+      .build();
+  }
+
+  public QueryProfile getQueryProfile() {
+    final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
+        .setQuery(runQuery.getPlan())
+        .setType(runQuery.getType())
+        .setId(queryId)
+        .setState(foreman.getState())
+        .setForeman(foreman.getQueryContext().getCurrentEndpoint())
+        .setStart(startTime)
+        .setEnd(endTime)
+        .setTotalFragments(fragmentDataSet.size())
+        .setFinishedFragments(finishedFragments.get());
+
+    if (planText != null) {
+      profileBuilder.setPlan(planText);
+    }
+
+    for (int i = 0; i < fragmentDataMap.allocated.length; i++) {
+      if (fragmentDataMap.allocated[i]) {
+        final int majorFragmentId = fragmentDataMap.keys[i];
+        final IntObjectOpenHashMap<FragmentData> minorMap =
+            (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i];
+        final MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder()
+            .setMajorFragmentId(majorFragmentId);
+        for (int v = 0; v < minorMap.allocated.length; v++) {
+          if (minorMap.allocated[v]) {
+            final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
+            fb.addMinorFragmentProfile(data.getStatus().getProfile());
+          }
+        }
+        profileBuilder.addFragmentProfile(fb);
+      }
+    }
 
+    return profileBuilder.build();
+  }
+
+  void setPlanText(final String planText) {
+    this.planText = planText;
+  }
 
+  void markStartTime() {
+    startTime = System.currentTimeMillis();
   }
 
+  void markEndTime() {
+    endTime = System.currentTimeMillis();
+  }
 }