You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/02 14:50:16 UTC

[4/7] drill git commit: DRILL-5963: Query state process improvements

DRILL-5963: Query state process improvements

1. Added two new query states: PREPARING (when foreman is initialized) and PLANNING (includes logical and / or physical planning).
2. Ability to cancel query during planning and enqueued states was added.
3. Logic for submitting fragments was moved from Foreman to new class FragmentsRunner.
4. Logic for moving query from to new state and incrementing / decrementing query counters was moved into QueryStateProcessor class.
5. Major type in DrillFuncHolderExpr was cached for better performance.

closes #1051


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/03435183
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/03435183
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/03435183

Branch: refs/heads/master
Commit: 034351837885d54196f3dd550ed2dc4e50fc4128
Parents: adee461
Author: Arina Ielchiieva <ar...@gmail.com>
Authored: Fri Nov 24 17:59:42 2017 +0200
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Tue Jan 2 15:43:29 2018 +0200

----------------------------------------------------------------------
 .../drill/exec/expr/DrillFuncHolderExpr.java    |   9 +-
 .../exec/server/rest/profile/ProfileUtil.java   |  41 +-
 .../server/rest/profile/ProfileWrapper.java     |   6 +-
 .../apache/drill/exec/work/QueryWorkUnit.java   |  45 +
 .../apache/drill/exec/work/foreman/Foreman.java | 887 ++++---------------
 .../exec/work/foreman/FragmentsRunner.java      | 410 +++++++++
 .../drill/exec/work/foreman/QueryManager.java   |  20 +-
 .../exec/work/foreman/QueryStateProcessor.java  | 355 ++++++++
 .../src/main/resources/drill-module.conf        |   2 +-
 .../exec/server/TestDrillbitResilience.java     |   3 +-
 .../apache/drill/exec/proto/UserBitShared.java  | 201 +++--
 .../drill/exec/proto/beans/QueryResult.java     |   6 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +
 13 files changed, 1166 insertions(+), 821 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
index 90368c4..1b0a6eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/DrillFuncHolderExpr.java
@@ -27,18 +27,21 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 
 public class DrillFuncHolderExpr extends FunctionHolderExpression implements Iterable<LogicalExpression>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class);
-  private DrillFuncHolder holder;
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFuncHolderExpr.class);
+  private final DrillFuncHolder holder;
+  private final MajorType majorType;
   private DrillSimpleFunc interpreter;
 
   public DrillFuncHolderExpr(String nameUsed, DrillFuncHolder holder, List<LogicalExpression> args, ExpressionPosition pos) {
     super(nameUsed, pos, args);
     this.holder = holder;
+    // since function return type can not be changed, cache it for better performance
+    this.majorType = holder.getReturnType(args);
   }
 
   @Override
   public MajorType getMajorType() {
-    return holder.getReturnType(args);
+    return majorType;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
index cfc7977..a0c0ea7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileUtil.java
@@ -19,30 +19,35 @@ package org.apache.drill.exec.server.rest.profile;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class ProfileUtil {
-  // Display names for QueryState enum in UserBitShared.proto
-  private static final String[] queryStateDisplayNames = {
-    "Starting", // STARTING = 0
-    "Running", // RUNNING = 1
-    "Succeeded", // COMPLETED = 2
-    "Canceled", // CANCELED = 3
-    "Failed", // FAILED = 4
-    "CancellationRequested", // CANCELLATION_REQUESTED = 5
-    "Enqueued" // ENQUEUED = 6
-  };
 
+  private static final Map<QueryState, String> queryStateDisplayMap = new HashMap<>(QueryState.values().length);
+
+  static {
+    queryStateDisplayMap.put(QueryState.PREPARING, "Preparing");
+    queryStateDisplayMap.put(QueryState.PLANNING, "Planning");
+    queryStateDisplayMap.put(QueryState.ENQUEUED, "Enqueued");
+    queryStateDisplayMap.put(QueryState.STARTING, "Starting");
+    queryStateDisplayMap.put(QueryState.RUNNING, "Running");
+    queryStateDisplayMap.put(QueryState.COMPLETED, "Succeeded");
+    queryStateDisplayMap.put(QueryState.FAILED, "Failed");
+    queryStateDisplayMap.put(QueryState.CANCELLATION_REQUESTED, "Cancellation Requested");
+    queryStateDisplayMap.put(QueryState.CANCELED, "Canceled");
+  }
 
   /**
-   * Utility to return display name for query state
-   * @param queryState
+   * Utility method to return display name for query state
+   * @param queryState query state
    * @return display string for query state
    */
-  public final static String getQueryStateDisplayName(QueryState queryState) {
-    int queryStateOrdinal = queryState.getNumber();
-    if (queryStateOrdinal >= queryStateDisplayNames.length) {
-      return queryState.name();
-    } else {
-      return queryStateDisplayNames[queryStateOrdinal];
+  public static String getQueryStateDisplayName(QueryState queryState) {
+    String displayName = queryStateDisplayMap.get(queryState);
+    if (displayName == null) {
+      displayName = queryState.name();
     }
+    return displayName;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 9c2b438..20cc0ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -202,8 +202,10 @@ public class ProfileWrapper {
   }
 
   public String getExecutionDuration() {
-    //Check if State is STARTING or RUNNING
-    if (profile.getState() == QueryState.STARTING ||
+    //Check if State is PREPARING, PLANNING, STARTING, ENQUEUED or RUNNING
+    if (profile.getState() == QueryState.PREPARING ||
+        profile.getState() == QueryState.PLANNING ||
+        profile.getState() == QueryState.STARTING ||
         profile.getState() == QueryState.ENQUEUED ||
         profile.getState() == QueryState.RUNNING) {
       return NOT_AVAILABLE_LABEL;

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
index a06d46c..2fa7576 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -23,11 +23,14 @@ import java.util.List;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
+import org.codehaus.jackson.map.ObjectMapper;
 
 public class QueryWorkUnit {
 
@@ -112,4 +115,46 @@ public class QueryWorkUnit {
       fragments.add(defn.applyPlan(reader));
     }
   }
+
+  /**
+   * Converts list of stored fragments into their string representation,
+   * in case of exception returns text indicating that string was malformed.
+   * Is used for debugging purposes.
+   *
+   * @return fragments information
+   */
+  public String stringifyFragments() {
+    StringBuilder stringBuilder = new StringBuilder();
+    final int fragmentCount = fragments.size();
+    int fragmentIndex = 0;
+    for (final PlanFragment planFragment : fragments) {
+      final ExecProtos.FragmentHandle fragmentHandle = planFragment.getHandle();
+      stringBuilder.append("PlanFragment(");
+      stringBuilder.append(++fragmentIndex);
+      stringBuilder.append('/');
+      stringBuilder.append(fragmentCount);
+      stringBuilder.append(") major_fragment_id ");
+      stringBuilder.append(fragmentHandle.getMajorFragmentId());
+      stringBuilder.append(" minor_fragment_id ");
+      stringBuilder.append(fragmentHandle.getMinorFragmentId());
+      stringBuilder.append('\n');
+
+      final CoordinationProtos.DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+      stringBuilder.append("  DrillbitEndpoint address ");
+      stringBuilder.append(endpointAssignment.getAddress());
+      stringBuilder.append('\n');
+
+      String jsonString = "<<malformed JSON>>";
+      stringBuilder.append("  fragment_json: ");
+      final ObjectMapper objectMapper = new ObjectMapper();
+      try {
+        final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+        jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+      } catch (final Exception e) {
+        // we've already set jsonString to a fallback value
+      }
+      stringBuilder.append(jsonString);
+    }
+    return stringBuilder.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8ce8639..391f100 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,18 +17,13 @@
  */
 package org.apache.drill.exec.work.foreman;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.drill.common.CatastrophicFailure;
-import org.apache.drill.common.EventProcessor;
-import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.LogicalPlan;
@@ -36,8 +31,6 @@ import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
@@ -48,9 +41,7 @@ import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -63,36 +54,21 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserClientConnection;
-import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.Pointer;
-import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
-import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.codahale.metrics.Counter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query where this
@@ -100,16 +76,17 @@ import io.netty.util.concurrent.GenericFutureListener;
  *
  * The flow is as follows:
  * <ul>
+ * <li>While Foreman is initialized query is in preparing state.</li>
  * <li>Foreman is submitted as a runnable.</li>
  * <li>Runnable does query planning.</li>
- * <li>state changes from PENDING to RUNNING</li>
- * <li>Runnable sends out starting fragments</li>
+ * <li>Runnable submits query to be enqueued.</li>
+ * <li>The Runnable's run() completes, but the Foreman stays around to listen to state changes.</li>
+ * <li>Once query is enqueued, starting fragments are sent out.</li>
  * <li>Status listener are activated</li>
- * <li>The Runnable's run() completes, but the Foreman stays around</li>
  * <li>Foreman listens for state change messages.</li>
- * <li>state change messages can drive the state to FAILED or CANCELED, in which case
- *   messages are sent to running fragments to terminate</li>
- * <li>when all fragments complete, state change messages drive the state to COMPLETED</li>
+ * <li>State change messages can drive the state to FAILED or CANCELED, in which case
+ *   messages are sent to running fragments to terminate.</li>
+ * <li>When all fragments is completed, state change messages drive the state to COMPLETED.</li>
  * </ul>
  */
 
@@ -118,90 +95,81 @@ public class Foreman implements Runnable {
   private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(Foreman.class);
 
-  public enum ProfileOption { SYNC, ASYNC, NONE };
+  public enum ProfileOption { SYNC, ASYNC, NONE }
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
-  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
-
-  private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued");
-  private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running");
-  private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed");
-  private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded");
-  private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed");
-  private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled");
 
   private final QueryId queryId;
   private final String queryIdString;
   private final RunQuery queryRequest;
   private final QueryContext queryContext;
   private final QueryManager queryManager; // handles lower-level details of query execution
-  private final WorkerBee bee; // provides an interface to submit tasks
   private final DrillbitContext drillbitContext;
   private final UserClientConnection initiatingClient; // used to send responses
-  private volatile QueryState state;
   private boolean resume = false;
   private final ProfileOption profileOption;
 
   private final QueryResourceManager queryRM;
 
   private final ResponseSendListener responseListener = new ResponseSendListener();
-  private final StateSwitch stateSwitch = new StateSwitch();
-  private final ForemanResult foremanResult = new ForemanResult();
   private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
   private final ChannelFuture closeFuture;
+  private final FragmentsRunner fragmentsRunner;
+  private final QueryStateProcessor queryStateProcessor;
 
   private String queryText;
 
   /**
    * Constructor. Sets up the Foreman, but does not initiate any execution.
    *
-   * @param bee used to submit additional work
-   * @param drillbitContext
-   * @param connection
+   * @param bee work manager (runs fragments)
+   * @param drillbitContext drillbit context
+   * @param connection connection
    * @param queryId the id for the query
    * @param queryRequest the query to execute
    */
   public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
       final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
-    this.bee = bee;
     this.queryId = queryId;
-    queryIdString = QueryIdHelper.getQueryId(queryId);
+    this.queryIdString = QueryIdHelper.getQueryId(queryId);
     this.queryRequest = queryRequest;
     this.drillbitContext = drillbitContext;
-
-    initiatingClient = connection;
-    closeFuture = initiatingClient.getChannelClosureFuture();
+    this.initiatingClient = connection;
+    this.closeFuture = initiatingClient.getChannelClosureFuture();
     closeFuture.addListener(closeListener);
 
-    queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
-    queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
+    this.queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
+    this.queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
         drillbitContext.getClusterCoordinator(), this);
+    this.queryRM = drillbitContext.getResourceManager().newQueryRM(this);
+    this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
+    this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
+    this.profileOption = setProfileOption(queryContext.getOptions());
+  }
 
-    recordNewState(QueryState.ENQUEUED);
-    enqueuedQueries.inc();
-    queryRM = drillbitContext.getResourceManager().newQueryRM(this);
 
-    profileOption = setProfileOption(queryContext.getOptions());
+  /**
+   * @return query id
+   */
+  public QueryId getQueryId() {
+    return queryId;
   }
 
-  private ProfileOption setProfileOption(OptionManager options) {
-    if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
-      return ProfileOption.NONE;
-    }
-    if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
-      return ProfileOption.SYNC;
-    } else {
-      return ProfileOption.ASYNC;
-    }
+  /**
+   * @return current query state
+   */
+  public QueryState getState() {
+    return queryStateProcessor.getState();
   }
 
-  private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
-    @Override
-    public void operationComplete(Future<Void> future) throws Exception {
-      cancel();
-    }
+  /**
+   * @return sql query text of the query request
+   */
+  public String getQueryText() {
+    return queryText;
   }
 
+
   /**
    * Get the QueryContext created for the query.
    *
@@ -221,12 +189,21 @@ public class Foreman implements Runnable {
   }
 
   /**
-   * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be
-   * terminated.
+   * Cancel the query (move query in cancellation requested state).
+   * Query execution will be canceled once possible.
    */
   public void cancel() {
-    // Note this can be called from outside of run() on another thread, or after run() completes
-    addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+    queryStateProcessor.cancel();
+  }
+
+  /**
+   * Adds query status in the event queue to process it when foreman is ready.
+   *
+   * @param state new query state
+   * @param exception exception if failure has occurred
+   */
+  public void addToEventQueue(QueryState state, Exception exception) {
+    queryStateProcessor.addToEventQueue(state, exception);
   }
 
   /**
@@ -255,23 +232,21 @@ public class Foreman implements Runnable {
     currentThread.setName(queryIdString + ":foreman");
     try {
       /*
-       Check if the foreman is ONLINE. If not dont accept any new queries.
+       Check if the foreman is ONLINE. If not don't accept any new queries.
       */
       if (!drillbitContext.isForemanOnline()) {
         throw new ForemanException("Query submission failed since Foreman is shutting down.");
       }
     } catch (ForemanException e) {
       logger.debug("Failure while submitting query", e);
-      addToEventQueue(QueryState.FAILED, e);
+      queryStateProcessor.addToEventQueue(QueryState.FAILED, e);
     }
-    // track how long the query takes
-    queryManager.markStartTime();
-    enqueuedQueries.dec();
-    runningQueries.inc();
+
+    queryText = queryRequest.getPlan();
+    queryStateProcessor.moveToState(QueryState.PLANNING, null);
 
     try {
       injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
-      queryText = queryRequest.getPlan();
 
       // convert a run query request into action
       switch (queryRequest.getType()) {
@@ -299,18 +274,15 @@ public class Foreman implements Runnable {
       }
       injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
     } catch (final OutOfMemoryException e) {
-      moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
+      queryStateProcessor.moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
     } catch (final ForemanException e) {
-      moveToState(QueryState.FAILED, e);
+      queryStateProcessor.moveToState(QueryState.FAILED, e);
     } catch (AssertionError | Exception ex) {
-      moveToState(QueryState.FAILED,
+      queryStateProcessor.moveToState(QueryState.FAILED,
           new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
     } catch (final OutOfMemoryError e) {
       if ("Direct buffer memory".equals(e.getMessage())) {
-        moveToState(QueryState.FAILED,
-            UserException.resourceError(e)
-                .message("One or more nodes ran out of memory while executing the query.")
-                .build(logger));
+        queryStateProcessor.moveToState(QueryState.FAILED, UserException.resourceError(e).message("One or more nodes ran out of memory while executing the query.").build(logger));
       } else {
         /*
          * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
@@ -321,35 +293,6 @@ public class Foreman implements Runnable {
       }
 
     } finally {
-      /*
-       * Begin accepting external events.
-       *
-       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
-       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
-       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
-       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
-       * event delivery call.
-       *
-       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
-       * make sure that we can't make things any worse as those events are delivered, but allow
-       * any necessary remaining cleanup to proceed.
-       *
-       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
-       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
-       * to accept events.
-       */
-      try {
-        stateSwitch.start();
-      } catch (Exception ex) {
-        moveToState(QueryState.FAILED, ex);
-      }
-
-      // If we received the resume signal before fragments are setup, the first call does not actually resume the
-      // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
-      if(resume) {
-        resume();
-      }
-
       // restore the thread's original name
       currentThread.setName(originalName);
     }
@@ -361,6 +304,31 @@ public class Foreman implements Runnable {
      */
   }
 
+  /**
+   * While one fragments where sanding out, other might have been completed. We don't want to process completed / failed
+   * events until all fragments are sent out. This method triggers events processing when all fragments were sent out.
+   */
+  public void startProcessingEvents() {
+    queryStateProcessor.startProcessingEvents();
+
+    // If we received the resume signal before fragments are setup, the first call does not actually resume the
+    // fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
+    if (resume) {
+      resume();
+    }
+  }
+
+  private ProfileOption setProfileOption(OptionManager options) {
+    if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
+      return ProfileOption.NONE;
+    }
+    if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
+      return ProfileOption.SYNC;
+    } else {
+      return ProfileOption.ASYNC;
+    }
+  }
+
   private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
     LogicalPlan logicalPlan;
     try {
@@ -436,48 +404,16 @@ public class Foreman implements Runnable {
     queryManager.setTotalCost(plan.totalCost());
     work.applyPlan(drillbitContext.getPlanReader());
     logWorkUnit(work);
-    admit(work);
-    queryManager.setQueueName(queryRM.queueName());
-
-    final List<PlanFragment> planFragments = work.getFragments();
-    final PlanFragment rootPlanFragment = work.getRootFragment();
-    assert queryId == rootPlanFragment.getHandle().getQueryId();
 
-    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
-    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
+    fragmentsRunner.setFragmentsInfo(work.getFragments(), work.getRootFragment(), work.getRootOperator());
 
-    logger.debug("Submitting fragments to run.");
-
-    // set up the root fragment first so we'll have incoming buffers available.
-    setupRootFragment(rootPlanFragment, work.getRootOperator());
-
-    setupNonRootFragments(planFragments);
-
-    moveToState(QueryState.RUNNING, null);
-    logger.debug("Fragments running.");
-  }
-
-  private void admit(QueryWorkUnit work) throws ForemanSetupException {
-    queryManager.markPlanningEndTime();
-    try {
-      queryRM.admit();
-    } catch (QueueTimeoutException e) {
-      throw UserException
-          .resourceError()
-          .message(e.getMessage())
-          .build(logger);
-    } catch (QueryQueueException e) {
-      throw new ForemanSetupException(e.getMessage(), e);
-    } finally {
-      queryManager.markQueueWaitEndTime();
-    }
-    moveToState(QueryState.STARTING, null);
+    startQueryProcessing();
   }
 
   /**
    * This is a helper method to run query based on the list of PlanFragment that were planned
    * at some point of time
-   * @param fragmentsList
+   * @param fragmentsList fragment list
    * @throws ExecutionSetupException
    */
   private void runFragment(List<PlanFragment> fragmentsList) throws ExecutionSetupException {
@@ -502,6 +438,8 @@ public class Foreman implements Runnable {
       }
     }
 
+    assert rootFragment != null;
+
     final FragmentRoot rootOperator;
     try {
       rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
@@ -509,26 +447,73 @@ public class Foreman implements Runnable {
       throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
     }
     queryRM.setCost(rootOperator.getCost());
-    admit(null);
-    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
-    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
-    logger.debug("Submitting fragments to run.");
+    fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);
 
-    // set up the root fragment first so we'll have incoming buffers available.
-    setupRootFragment(rootFragment, rootOperator);
+    startQueryProcessing();
+  }
 
-    setupNonRootFragments(planFragments);
+  /**
+   * Enqueues the query and once enqueued, starts sending out query fragments for further execution.
+   * Moves query to RUNNING state.
+   */
+  private void startQueryProcessing() {
+    enqueue();
+    runFragments();
+    queryStateProcessor.moveToState(QueryState.RUNNING, null);
+  }
 
-    moveToState(QueryState.RUNNING, null);
-    logger.debug("Fragments running.");
+  /**
+   * Move query to ENQUEUED state. Enqueues query if queueing is enabled.
+   * Foreman run will be blocked until query is enqueued.
+   * In case of failures (ex: queue timeout exception) will move query to FAILED state.
+   */
+  private void enqueue() {
+    queryStateProcessor.moveToState(QueryState.ENQUEUED, null);
+
+    try {
+      queryRM.admit();
+      queryStateProcessor.moveToState(QueryState.STARTING, null);
+    } catch (QueueTimeoutException | QueryQueueException e) {
+      queryStateProcessor.moveToState(QueryState.FAILED, e);
+    } finally {
+      String queueName = queryRM.queueName();
+      queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
+    }
+  }
+
+  private void runFragments() {
+    try {
+      fragmentsRunner.submit();
+    } catch (Exception e) {
+      queryStateProcessor.moveToState(QueryState.FAILED, e);
+    } finally {
+       /*
+       * Begin accepting external events.
+       *
+       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
+       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
+       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
+       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
+       * event delivery call.
+       *
+       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
+       * make sure that we can't make things any worse as those events are delivered, but allow
+       * any necessary remaining cleanup to proceed.
+       *
+       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because Foreman
+       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the Foreman
+       * to accept events.
+       */
+      startProcessingEvents();
+    }
   }
 
   /**
    * Helper method to execute the query in prepared statement. Current implementation takes the query from opaque
    * object of the <code>preparedStatement</code> and submits as a new query.
    *
-   * @param preparedStatementHandle
+   * @param preparedStatementHandle prepared statement handle
    * @throws ExecutionSetupException
    */
   private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
@@ -559,10 +544,6 @@ public class Foreman implements Runnable {
     }
   }
 
-  Exception getCurrentException() {
-    return foremanResult.getException();
-  }
-
   private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
     final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
     final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
@@ -577,47 +558,26 @@ public class Foreman implements Runnable {
     if (! logger.isTraceEnabled()) {
       return;
     }
-    final StringBuilder sb = new StringBuilder();
-    sb.append("PlanFragments for query ");
-    sb.append(queryId);
-    sb.append('\n');
-
-    final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
-    final int fragmentCount = planFragments.size();
-    int fragmentIndex = 0;
-    for(final PlanFragment planFragment : planFragments) {
-      final FragmentHandle fragmentHandle = planFragment.getHandle();
-      sb.append("PlanFragment(");
-      sb.append(++fragmentIndex);
-      sb.append('/');
-      sb.append(fragmentCount);
-      sb.append(") major_fragment_id ");
-      sb.append(fragmentHandle.getMajorFragmentId());
-      sb.append(" minor_fragment_id ");
-      sb.append(fragmentHandle.getMinorFragmentId());
-      sb.append('\n');
-
-      final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
-      sb.append("  DrillbitEndpoint address ");
-      sb.append(endpointAssignment.getAddress());
-      sb.append('\n');
-
-      String jsonString = "<<malformed JSON>>";
-      sb.append("  fragment_json: ");
-      final ObjectMapper objectMapper = new ObjectMapper();
-      try
-      {
-        final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
-        jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
-      } catch(final Exception e) {
-        // we've already set jsonString to a fallback value
-      }
-      sb.append(jsonString);
+    logger.trace(String.format("PlanFragments for query %s \n%s",
+        queryId, queryWorkUnit.stringifyFragments()));
+  }
+
+  private void runSQL(final String sql) throws ExecutionSetupException {
+    final Pointer<String> textPlan = new Pointer<>();
+    final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
+    queryManager.setPlanText(textPlan.value);
+    runPhysicalPlan(plan);
+  }
 
-      logger.trace(sb.toString());
+  private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence()));
     }
+    return new BasicOptimizer(queryContext, initiatingClient).optimize(
+        new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
   }
 
+
   /**
    * Manages the end-state processing for Foreman.
    *
@@ -631,7 +591,7 @@ public class Foreman implements Runnable {
    * The idea here is to make close()ing the ForemanResult do the final cleanup and
    * sending. Closing the result must be the last thing that is done by Foreman.
    */
-  private class ForemanResult implements AutoCloseable {
+  public class ForemanResult implements AutoCloseable {
     private QueryState resultState = null;
     private volatile Exception resultException = null;
     private boolean isClosed = false;
@@ -688,7 +648,7 @@ public class Foreman implements Runnable {
      * @param exception the exception to add
      */
     private void addException(final Exception exception) {
-      Preconditions.checkNotNull(exception);
+      assert exception != null;
 
       if (resultException == null) {
         resultException = exception;
@@ -741,7 +701,7 @@ public class Foreman implements Runnable {
             queryText,
             new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
             new Date(System.currentTimeMillis()),
-            state,
+            queryStateProcessor.getState(),
             queryContext.getSession().getCredentials().getUserName(),
             initiatingClient.getRemoteAddress());
         queryLogger.info(MAPPER.writeValueAsString(q));
@@ -756,9 +716,6 @@ public class Foreman implements Runnable {
       Preconditions.checkState(!isClosed);
       Preconditions.checkState(resultState != null);
 
-      // to track how long the query takes
-      queryManager.markEndTime();
-
       logger.debug(queryIdString + ": cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
 
@@ -780,11 +737,11 @@ public class Foreman implements Runnable {
        *
        * We only need to do this if the resultState differs from the last recorded state
        */
-      if (resultState != state) {
+      if (resultState != queryStateProcessor.getState()) {
         suppressingClose(new AutoCloseable() {
           @Override
           public void close() throws Exception {
-            recordNewState(resultState);
+            queryStateProcessor.recordNewState(resultState);
           }
         });
       }
@@ -842,7 +799,7 @@ public class Foreman implements Runnable {
       }
 
       // Remove the Foreman from the running query list.
-      bee.retireForeman(Foreman.this);
+      fragmentsRunner.getBee().retireForeman(Foreman.this);
 
       try {
         queryManager.close();
@@ -850,21 +807,9 @@ public class Foreman implements Runnable {
         logger.warn("unable to close query manager", e);
       }
 
-      // Incrementing QueryState counters
-      switch (state) {
-        case FAILED:
-          failedQueries.inc();
-          break;
-        case CANCELED:
-          canceledQueries.inc();
-          break;
-        case COMPLETED:
-          succeededQueries.inc();
-          break;
-      }
 
-      runningQueries.dec();
-      completedQueries.inc();
+      queryStateProcessor.close();
+
       try {
         queryRM.exit();
       } finally {
@@ -873,472 +818,10 @@ public class Foreman implements Runnable {
     }
   }
 
-  private static class StateEvent {
-    final QueryState newState;
-    final Exception exception;
-
-    StateEvent(final QueryState newState, final Exception exception) {
-      this.newState = newState;
-      this.exception = exception;
-    }
-  }
-
-  private void moveToState(final QueryState newState, final Exception exception) {
-    logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
-      exception);
-    switch (state) {
-    case ENQUEUED:
-      switch (newState) {
-      case FAILED:
-        Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
-        recordNewState(newState);
-        foremanResult.setFailed(exception);
-        foremanResult.close();
-        return;
-      case STARTING:
-        recordNewState(newState);
-        return;
-      }
-      break;
-    case STARTING:
-      if (newState == QueryState.RUNNING) {
-        recordNewState(QueryState.RUNNING);
-        return;
-      }
-
-      //$FALL-THROUGH$
-
-    case RUNNING: {
-      /*
-       * For cases that cancel executing fragments, we have to record the new
-       * state first, because the cancellation of the local root fragment will
-       * cause this to be called recursively.
-       */
-      switch (newState) {
-      case CANCELLATION_REQUESTED: {
-        assert exception == null;
-        recordNewState(QueryState.CANCELLATION_REQUESTED);
-        queryManager.cancelExecutingFragments(drillbitContext);
-        foremanResult.setCompleted(QueryState.CANCELED);
-        /*
-         * We don't close the foremanResult until we've gotten
-         * acknowledgments, which happens below in the case for current state
-         * == CANCELLATION_REQUESTED.
-         */
-        return;
-      }
-
-      case COMPLETED: {
-        assert exception == null;
-        recordNewState(QueryState.COMPLETED);
-        foremanResult.setCompleted(QueryState.COMPLETED);
-        foremanResult.close();
-        return;
-      }
-
-      case FAILED: {
-        assert exception != null;
-        recordNewState(QueryState.FAILED);
-        queryManager.cancelExecutingFragments(drillbitContext);
-        foremanResult.setFailed(exception);
-        foremanResult.close();
-        return;
-      }
-
-      }
-      break;
-    }
-
-    case CANCELLATION_REQUESTED:
-      if ((newState == QueryState.CANCELED)
-        || (newState == QueryState.COMPLETED)
-        || (newState == QueryState.FAILED)) {
-
-        if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
-          if (newState == QueryState.FAILED) {
-            assert exception != null;
-            recordNewState(QueryState.FAILED);
-            foremanResult.setForceFailure(exception);
-          }
-        }
-        /*
-         * These amount to a completion of the cancellation requests' cleanup;
-         * now we can clean up and send the result.
-         */
-        foremanResult.close();
-      }
-      return;
-
-    case CANCELED:
-    case COMPLETED:
-    case FAILED:
-      logger
-        .warn(
-          "Dropping request to move to {} state as query is already at {} state (which is terminal).",
-          newState, state);
-      return;
-    }
-
-    throw new IllegalStateException(String.format(
-      "Failure trying to change states: %s --> %s", state.name(),
-      newState.name()));
-  }
-
-  private class StateSwitch extends EventProcessor<StateEvent> {
-    public void addEvent(final QueryState newState, final Exception exception) {
-      sendEvent(new StateEvent(newState, exception));
-    }
-
-    @Override
-    protected void processEvent(final StateEvent event) {
-      moveToState(event.newState, event.exception);
-    }
-  }
-
-  /**
-   * Tells the foreman to move to a new state.<br>
-   * This will be added to the end of the event queue and will be processed once the foreman is ready
-   * to accept external events.
-   *
-   * @param newState the state to move to
-   * @param exception if not null, the exception that drove this state transition (usually a failure)
-   */
-  public void addToEventQueue(final QueryState newState, final Exception exception) {
-    stateSwitch.addEvent(newState, exception);
-  }
-
-  private void recordNewState(final QueryState newState) {
-    state = newState;
-    queryManager.updateEphemeralState(newState);
-  }
-
-  private void runSQL(final String sql) throws ExecutionSetupException {
-    final Pointer<String> textPlan = new Pointer<>();
-    final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
-    queryManager.setPlanText(textPlan.value);
-    runPhysicalPlan(plan);
-  }
-
-  private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getLpPersistence()));
-    }
-    return new BasicOptimizer(queryContext, initiatingClient).optimize(
-        new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  /**
-   * Set up the root fragment (which will run locally), and submit it for execution.
-   *
-   * @param rootFragment
-   * @param rootOperator
-   * @throws ExecutionSetupException
-   */
-  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
-      throws ExecutionSetupException {
-    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
-        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
-    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
-    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator);
-    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter);
-
-    queryManager.addFragmentStatusTracker(rootFragment, true);
-
-    // FragmentManager is setting buffer for FragmentContext
-    if (rootContext.isBuffersDone()) {
-      // if we don't have to wait for any incoming data, start the fragment runner.
-      bee.addFragmentRunner(rootRunner);
-    } else {
-      // if we do, record the fragment manager in the workBus.
-      drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
-    }
-  }
-
-  /**
-   * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node
-   * and the local Drillbit Endpoint.
-   * @param planFragment
-   * @param localEndPoint
-   * @param localFragmentList
-   * @param remoteFragmentMap
-   */
-  private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint,
-                                        final List<PlanFragment> localFragmentList,
-                                        final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
-
-    if (assignedDrillbit.equals(localEndPoint)) {
-      localFragmentList.add(planFragment);
-    } else {
-      remoteFragmentMap.put(assignedDrillbit, planFragment);
-    }
-  }
-
-  /**
-   * Send remote intermediate fragment to the assigned Drillbit node. Throw exception in case of failure to send the
-   * fragment.
-   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's
-   */
-  private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-
-    final int numIntFragments = remoteFragmentMap.keySet().size();
-    final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
-    final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
-
-    // send remote intermediate fragments
-    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
-    }
-
-    final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
-    if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
-      long numberRemaining = endpointLatch.getCount();
-      throw UserException.connectionError()
-          .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
-              "Sent %d and only heard response back from %d nodes.",
-              timeout, numIntFragments, numIntFragments - numberRemaining).build(logger);
-    }
-
-    // if any of the intermediate fragment submissions failed, fail the query
-    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
-        fragmentSubmitFailures.submissionExceptions;
-
-    if (submissionExceptions.size() > 0) {
-      Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-
-      for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
-        DrillbitEndpoint endpoint = e.drillbitEndpoint;
-        if (endpoints.add(endpoint)) {
-          if (first) {
-            first = false;
-          } else {
-            sb.append(", ");
-          }
-          sb.append(endpoint.getAddress());
-        }
-      }
-      throw UserException.connectionError(submissionExceptions.get(0).rpcException)
-          .message("Error setting up remote intermediate fragment execution")
-          .addContext("Nodes with failures", sb.toString()).build(logger);
-    }
-  }
-
-
-  /**
-   * Start the locally assigned leaf or intermediate fragment
-   * @param fragment
-   * @throws ForemanException
-   */
-  private void startLocalFragment(final PlanFragment fragment) throws ForemanException {
-
-    logger.debug("Received local fragment start instruction", fragment);
-
-    try {
-      final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment,
-          drillbitContext.getFunctionImplementationRegistry());
-      final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
-      final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
-
-      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
-      if (fragment.getLeafFragment()) {
-        bee.addFragmentRunner(fragmentExecutor);
-      } else {
-        // isIntermediate, store for incoming data.
-        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
-        drillbitContext.getWorkBus().addFragmentManager(manager);
-      }
-
-    } catch (final ExecutionSetupException ex) {
-      throw new ForemanException("Failed to create fragment context", ex);
-    } catch (final Exception ex) {
-      throw new ForemanException("Failed while trying to start local fragment", ex);
-    }
-  }
-
-  /**
-   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
-   * Messages are sent immediately, so they may start returning data even before we complete this.
-   *
-   * @param fragments the fragments
-   * @throws ForemanException
-   */
-  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
-    if (fragments.isEmpty()) {
-      // nothing to do here
-      return;
-    }
-    /*
-     * We will send a single message to each endpoint, regardless of how many fragments will be
-     * executed there. We need to start up the intermediate fragments first so that they will be
-     * ready once the leaf fragments start producing data. To satisfy both of these, we will
-     * make a pass through the fragments and put them into the remote maps according to their
-     * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
-     * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists.
-     *
-     * This will help to schedule local
-     */
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create();
-    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
-    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create();
-    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
-
-    final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
-    // record all fragments for status purposes.
-    for (final PlanFragment planFragment : fragments) {
-
-      if (logger.isTraceEnabled()) {
-        logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(),
-            planFragment.getFragmentJson());
-      }
-
-      queryManager.addFragmentStatusTracker(planFragment, false);
-
-      if (planFragment.getLeafFragment()) {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap);
-      } else {
-        updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap);
-      }
-    }
-
-    /*
-     * We need to wait for the intermediates to be sent so that they'll be set up by the time
-     * the leaves start producing data. We'll use this latch to wait for the responses.
-     *
-     * However, in order not to hang the process if any of the RPC requests fails, we always
-     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
-     * know if any submissions did fail.
-     */
-    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
-
-    // Setup local intermediate fragments
-    for (final PlanFragment fragment : localIntFragmentList) {
-      startLocalFragment(fragment);
-    }
-
-    injector.injectChecked(queryContext.getExecutionControls(), "send-fragments", ForemanException.class);
-    /*
-     * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
-     * the regular sendListener event delivery.
-     */
-    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
-    }
-
-    // Setup local leaf fragments
-    for (final PlanFragment fragment : localLeafFragmentList) {
-      startLocalFragment(fragment);
-    }
-  }
-
-  /**
-   * Send all the remote fragments belonging to a single target drillbit in one request.
-   *
-   * @param assignment the drillbit assigned to these fragments
-   * @param fragments the set of fragments
-   * @param latch the countdown latch used to track the requests to all endpoints
-   * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
-   */
-  private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
-      final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
-    @SuppressWarnings("resource")
-    final Controller controller = drillbitContext.getController();
-    final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
-    for(final PlanFragment planFragment : fragments) {
-      fb.addFragment(planFragment);
-    }
-    final InitializeFragments initFrags = fb.build();
-
-    logger.debug("Sending remote fragments to \nNode:\n{} \n\nData:\n{}", assignment, initFrags);
-    final FragmentSubmitListener listener =
-        new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
-    controller.getTunnel(assignment).sendFragments(listener, initFrags);
-  }
-
-  public QueryState getState() {
-    return state;
-  }
-
-  /**
-   * @return sql query text of the query request
-   */
-  public String getQueryText() {
-    return queryText;
-  }
-
-  /**
-   * Used by {@link FragmentSubmitListener} to track the number of submission failures.
-   */
-  private static class FragmentSubmitFailures {
-    static class SubmissionException {
-      final DrillbitEndpoint drillbitEndpoint;
-      final RpcException rpcException;
-
-      SubmissionException(final DrillbitEndpoint drillbitEndpoint,
-          final RpcException rpcException) {
-        this.drillbitEndpoint = drillbitEndpoint;
-        this.rpcException = rpcException;
-      }
-    }
-
-    final List<SubmissionException> submissionExceptions = new LinkedList<>();
-
-    void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
-      submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
-    }
-  }
-
-  private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments> {
-    private final CountDownLatch latch;
-    private final FragmentSubmitFailures fragmentSubmitFailures;
-
-    /**
-     * Constructor.
-     *
-     * @param endpoint the endpoint for the submission
-     * @param value the initialize fragments message
-     * @param latch the latch to count down when the status is known; may be null
-     * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
-     */
-    public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
-        final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
-      super(endpoint, value);
-      Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
-      this.latch = latch;
-      this.fragmentSubmitFailures = fragmentSubmitFailures;
-    }
-
-    @Override
-    public void success(final Ack ack, final ByteBuf byteBuf) {
-      if (latch != null) {
-        latch.countDown();
-      }
-    }
-
-    @Override
-    public void failed(final RpcException ex) {
-      if (latch != null) { // this block only applies to intermediate fragments
-        fragmentSubmitFailures.addFailure(endpoint, ex);
-        latch.countDown();
-      } else { // this block only applies to leaf fragments
-        // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
-        logger.debug("Failure while sending fragment.  Stopping query.", ex);
-        addToEventQueue(QueryState.FAILED, ex);
-      }
-    }
-
+  private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
     @Override
-    public void interrupted(final InterruptedException e) {
-      // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
-      // Consider the interrupt as failure.
-      final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
-      logger.error(errMsg, e);
-      failed(new RpcException(errMsg, e));
+    public void operationComplete(Future<Void> future) throws Exception {
+      cancel();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
new file mode 100644
index 0000000..ce04848
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.testing.ControlsInjector;
+import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
+import org.apache.drill.exec.work.fragment.RootFragmentManager;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Is responsible for submitting query fragments for running (locally and remotely).
+ */
+public class FragmentsRunner {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentsRunner.class);
+  private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentsRunner.class);
+
+  private static final long RPC_WAIT_IN_MSECS_PER_FRAGMENT = 5000;
+
+  private final WorkerBee bee;
+  private final UserClientConnection initiatingClient;
+  private final DrillbitContext drillbitContext;
+  private final Foreman foreman;
+
+  private List<PlanFragment> planFragments;
+  private PlanFragment rootPlanFragment;
+  private FragmentRoot rootOperator;
+
+  public FragmentsRunner(WorkerBee bee, UserClientConnection initiatingClient, DrillbitContext drillbitContext, Foreman foreman) {
+    this.bee = bee;
+    this.initiatingClient = initiatingClient;
+    this.drillbitContext = drillbitContext;
+    this.foreman = foreman;
+  }
+
+  public WorkerBee getBee() {
+    return bee;
+  }
+
+  public void setFragmentsInfo(List<PlanFragment> planFragments,
+                                  PlanFragment rootPlanFragment,
+                                  FragmentRoot rootOperator) {
+    this.planFragments = planFragments;
+    this.rootPlanFragment = rootPlanFragment;
+    this.rootOperator = rootOperator;
+  }
+
+  /**
+   * Submits root and non-root fragments fragments for running.
+   * In case of success move query to the running state.
+   */
+  public void submit() throws ExecutionSetupException {
+    assert planFragments != null;
+    assert rootPlanFragment != null;
+    assert rootOperator != null;
+
+    QueryId queryId = foreman.getQueryId();
+    assert queryId == rootPlanFragment.getHandle().getQueryId();
+
+    QueryManager queryManager = foreman.getQueryManager();
+    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
+    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
+
+    logger.debug("Submitting fragments to run.");
+    // set up the root fragment first so we'll have incoming buffers available.
+    setupRootFragment(rootPlanFragment, rootOperator);
+    setupNonRootFragments(planFragments);
+    logger.debug("Fragments running.");
+  }
+
+  /**
+   * Set up the root fragment (which will run locally), and submit it for execution.
+   *
+   * @param rootFragment root fragment
+   * @param rootOperator root operator
+   * @throws ExecutionSetupException
+   */
+  private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator) throws ExecutionSetupException {
+    QueryManager queryManager = foreman.getQueryManager();
+    final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, foreman.getQueryContext(),
+        initiatingClient, drillbitContext.getFunctionImplementationRegistry());
+    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
+    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator);
+    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter);
+
+    queryManager.addFragmentStatusTracker(rootFragment, true);
+
+    // FragmentManager is setting buffer for FragmentContext
+    if (rootContext.isBuffersDone()) {
+      // if we don't have to wait for any incoming data, start the fragment runner.
+      bee.addFragmentRunner(rootRunner);
+    } else {
+      // if we do, record the fragment manager in the workBus.
+      drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
+    }
+  }
+
+
+  /**
+   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
+   * Messages are sent immediately, so they may start returning data even before we complete this.
+   *
+   * @param fragments the fragments
+   */
+  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ExecutionSetupException {
+    if (fragments.isEmpty()) {
+      // nothing to do here
+      return;
+    }
+    /*
+     * We will send a single message to each endpoint, regardless of how many fragments will be
+     * executed there. We need to start up the intermediate fragments first so that they will be
+     * ready once the leaf fragments start producing data. To satisfy both of these, we will
+     * make a pass through the fragments and put them into the remote maps according to their
+     * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
+     * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists.
+     *
+     * This will help to schedule local
+     */
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create();
+    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create();
+    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+
+    final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
+    // record all fragments for status purposes.
+    for (final PlanFragment planFragment : fragments) {
+
+      if (logger.isTraceEnabled()) {
+        logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(),
+            planFragment.getFragmentJson());
+      }
+
+      foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
+
+      if (planFragment.getLeafFragment()) {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap);
+      } else {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap);
+      }
+    }
+
+    /*
+     * We need to wait for the intermediates to be sent so that they'll be set up by the time
+     * the leaves start producing data. We'll use this latch to wait for the responses.
+     *
+     * However, in order not to hang the process if any of the RPC requests fails, we always
+     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
+     * know if any submissions did fail.
+     */
+    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
+
+    // Setup local intermediate fragments
+    for (final PlanFragment fragment : localIntFragmentList) {
+      startLocalFragment(fragment);
+    }
+
+    injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class);
+    /*
+     * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
+     * the regular sendListener event delivery.
+     */
+    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
+    }
+
+    // Setup local leaf fragments
+    for (final PlanFragment fragment : localLeafFragmentList) {
+      startLocalFragment(fragment);
+    }
+  }
+
+  /**
+   * Send all the remote fragments belonging to a single target drillbit in one request.
+   *
+   * @param assignment the drillbit assigned to these fragments
+   * @param fragments the set of fragments
+   * @param latch the countdown latch used to track the requests to all endpoints
+   * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
+   */
+  private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
+                                   final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+    @SuppressWarnings("resource")
+    final Controller controller = drillbitContext.getController();
+    final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
+    for(final PlanFragment planFragment : fragments) {
+      fb.addFragment(planFragment);
+    }
+    final InitializeFragments initFrags = fb.build();
+
+    logger.debug("Sending remote fragments to node: {}\nData: {}", assignment, initFrags);
+    final FragmentSubmitListener listener =
+        new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
+    controller.getTunnel(assignment).sendFragments(listener, initFrags);
+  }
+
+  /**
+   * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node
+   * and the local Drillbit Endpoint.
+   *
+   * @param planFragment plan fragment
+   * @param localEndPoint local endpoint
+   * @param localFragmentList local fragment list
+   * @param remoteFragmentMap remote fragment map
+   */
+  private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint,
+                                        final List<PlanFragment> localFragmentList,
+                                        final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
+
+    if (assignedDrillbit.equals(localEndPoint)) {
+      localFragmentList.add(planFragment);
+    } else {
+      remoteFragmentMap.put(assignedDrillbit, planFragment);
+    }
+  }
+
+  /**
+   * Send remote intermediate fragment to the assigned Drillbit node.
+   * Throw exception in case of failure to send the fragment.
+   *
+   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's
+   */
+  private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+
+    final int numIntFragments = remoteFragmentMap.keySet().size();
+    final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
+    final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
+
+    // send remote intermediate fragments
+    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
+    }
+
+    final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
+    if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
+      long numberRemaining = endpointLatch.getCount();
+      throw UserException.connectionError()
+          .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+                  "Sent %d and only heard response back from %d nodes.",
+              timeout, numIntFragments, numIntFragments - numberRemaining).build(logger);
+    }
+
+    // if any of the intermediate fragment submissions failed, fail the query
+    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
+        fragmentSubmitFailures.submissionExceptions;
+
+    if (submissionExceptions.size() > 0) {
+      Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+
+      for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
+        DrillbitEndpoint endpoint = e.drillbitEndpoint;
+        if (endpoints.add(endpoint)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(endpoint.getAddress());
+        }
+      }
+      throw UserException.connectionError(submissionExceptions.get(0).rpcException)
+          .message("Error setting up remote intermediate fragment execution")
+          .addContext("Nodes with failures", sb.toString()).build(logger);
+    }
+  }
+
+
+  /**
+   * Start the locally assigned leaf or intermediate fragment
+   *
+   * @param fragment fragment
+   */
+  private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException {
+    logger.debug("Received local fragment start instruction", fragment);
+
+    final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry());
+    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
+    final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
+
+    // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+    if (fragment.getLeafFragment()) {
+      bee.addFragmentRunner(fragmentExecutor);
+    } else {
+      // isIntermediate, store for incoming data.
+      final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
+      drillbitContext.getWorkBus().addFragmentManager(manager);
+    }
+  }
+
+  /**
+   * Used by {@link FragmentSubmitListener} to track the number of submission failures.
+   */
+  private static class FragmentSubmitFailures {
+    static class SubmissionException {
+      final DrillbitEndpoint drillbitEndpoint;
+      final RpcException rpcException;
+
+      SubmissionException(final DrillbitEndpoint drillbitEndpoint,
+                          final RpcException rpcException) {
+        this.drillbitEndpoint = drillbitEndpoint;
+        this.rpcException = rpcException;
+      }
+    }
+
+    final List<SubmissionException> submissionExceptions = new LinkedList<>();
+
+    void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
+      submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
+    }
+  }
+
+  private class FragmentSubmitListener extends EndpointListener<GeneralRPCProtos.Ack, InitializeFragments> {
+    private final CountDownLatch latch;
+    private final FragmentSubmitFailures fragmentSubmitFailures;
+
+    /**
+     * Constructor.
+     *
+     * @param endpoint the endpoint for the submission
+     * @param value the initialize fragments message
+     * @param latch the latch to count down when the status is known; may be null
+     * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
+     */
+    public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
+                                  final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+      super(endpoint, value);
+      Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
+      this.latch = latch;
+      this.fragmentSubmitFailures = fragmentSubmitFailures;
+    }
+
+    @Override
+    public void success(final GeneralRPCProtos.Ack ack, final ByteBuf byteBuf) {
+      if (latch != null) {
+        latch.countDown();
+      }
+    }
+
+    @Override
+    public void failed(final RpcException ex) {
+      if (latch != null) { // this block only applies to intermediate fragments
+        fragmentSubmitFailures.addFailure(endpoint, ex);
+        latch.countDown();
+      } else { // this block only applies to leaf fragments
+        // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
+        logger.debug("Failure while sending fragment.  Stopping query.", ex);
+        foreman.addToEventQueue(QueryState.FAILED, ex);
+      }
+    }
+
+    @Override
+    public void interrupted(final InterruptedException e) {
+      // Foreman shouldn't get interrupted while waiting for the RPC outcome of fragment submission.
+      // Consider the interrupt as failure.
+      final String errMsg = "Interrupted while waiting for the RPC outcome of fragment submission.";
+      logger.error(errMsg, e);
+      failed(new RpcException(errMsg, e));
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 216a80d..addd8fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -280,14 +280,15 @@ public class QueryManager implements AutoCloseable {
   }
 
   void updateEphemeralState(final QueryState queryState) {
-      // If query is already in zk transient store, ignore the transient state update option.
-      // Else, they will not be removed from transient store upon completion.
-      if (!inTransientStore &&
-          !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
-        return;
-      }
+    // If query is already in zk transient store, ignore the transient state update option.
+    // Else, they will not be removed from transient store upon completion.
+    if (!inTransientStore && !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
+      return;
+    }
 
-      switch (queryState) {
+    switch (queryState) {
+      case PREPARING:
+      case PLANNING:
       case ENQUEUED:
       case STARTING:
       case RUNNING:
@@ -295,15 +296,14 @@ public class QueryManager implements AutoCloseable {
         runningProfileStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
         inTransientStore = true;
         break;
-
       case COMPLETED:
       case CANCELED:
       case FAILED:
         try {
           runningProfileStore.remove(stringQueryId);
           inTransientStore = false;
-        } catch(final Exception e) {
-          logger.warn("Failure while trying to delete the estore profile for this query.", e);
+        } catch (final Exception e) {
+          logger.warn("Failure while trying to delete the stored profile for the query [{}]", stringQueryId, e);
         }
         break;