You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/02 14:50:15 UTC

[3/7] drill git commit: DRILL-5963: Query state process improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
new file mode 100644
index 0000000..2443139
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.common.EventProcessor;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanResult;
+
+/**
+ * Is responsible for query transition from one state to another,
+ * incrementing / decrementing query statuses counters.
+ */
+public class QueryStateProcessor implements AutoCloseable {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStateProcessor.class);
+
+  private static final Counter planningQueries = DrillMetrics.getRegistry().counter("drill.queries.planning");
+  private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued");
+  private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running");
+  private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed");
+  private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded");
+  private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed");
+  private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled");
+
+  private final StateSwitch stateSwitch = new StateSwitch();
+
+  private final String queryIdString;
+  private final QueryManager queryManager;
+  private final DrillbitContext drillbitContext;
+  private final ForemanResult foremanResult;
+
+  private volatile QueryState state;
+
+  public QueryStateProcessor(String queryIdString, QueryManager queryManager, DrillbitContext drillbitContext, ForemanResult foremanResult) {
+    this.queryIdString = queryIdString;
+    this.queryManager = queryManager;
+    this.drillbitContext = drillbitContext;
+    this.foremanResult = foremanResult;
+    // initial query state is PREPARING
+    this.state = QueryState.PREPARING;
+  }
+
+  /**
+   * @return current query state
+   */
+  public QueryState getState() {
+    return state;
+  }
+
+  /**
+   * Moves one query state to another, will fail when requested query state transition is not allowed.
+   *
+   * @param newState new query state
+   * @param exception exception if failure occurred
+   */
+  public synchronized void moveToState(QueryState newState, Exception exception) {
+    logger.debug(queryIdString + ": State change requested {} --> {}", state, newState);
+
+    switch (state) {
+      case PREPARING:
+        preparing(newState, exception);
+        return;
+      case PLANNING:
+        planning(newState, exception);
+        return;
+      case ENQUEUED:
+        enqueued(newState, exception);
+        return;
+      case STARTING:
+        starting(newState, exception);
+        return;
+      case RUNNING:
+        running(newState, exception);
+        return;
+      case CANCELLATION_REQUESTED:
+        cancellationRequested(newState, exception);
+        return;
+      case CANCELED:
+      case COMPLETED:
+      case FAILED:
+        logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state);
+        return;
+    }
+
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+  }
+
+  /**
+   * Directly moves query from one state to another and updates ephemeral query store.
+   *
+   * @param newState new query state
+   */
+  public void recordNewState(final QueryState newState) {
+    state = newState;
+    queryManager.updateEphemeralState(newState);
+  }
+
+  /**
+   * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated.
+   * For preparing, planning and enqueued states we cancel immediately since these states are done locally.
+   *
+   * Note this can be called from outside of run() on another thread, or after run() completes
+   */
+  public void cancel() {
+    switch (state) {
+      case PREPARING:
+      case PLANNING:
+      case ENQUEUED:
+        moveToState(QueryState.CANCELLATION_REQUESTED, null);
+        return;
+
+      case STARTING:
+      case RUNNING:
+        addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+        return;
+
+      case CANCELLATION_REQUESTED:
+      case CANCELED:
+      case COMPLETED:
+      case FAILED:
+        // nothing to do
+        return;
+
+      default:
+        throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state);
+    }
+  }
+
+  /**
+   * Tells the foreman to move to a new state.<br>
+   * This will be added to the end of the event queue and will be processed once the foreman is ready
+   * to accept external events.
+   *
+   * @param newState the state to move to
+   * @param exception if not null, the exception that drove this state transition (usually a failure)
+   */
+  public void addToEventQueue(final QueryState newState, final Exception exception) {
+    stateSwitch.addEvent(newState, exception);
+  }
+
+  /**
+   * Starts processing all events that were enqueued while all fragments were sending out.
+   */
+  public void startProcessingEvents() {
+    try {
+      stateSwitch.start();
+    } catch (Exception ex) {
+      moveToState(QueryState.FAILED, ex);
+    }
+  }
+
+  /**
+   * On close set proper increment / decrement counters depending on final query state.
+   */
+  @Override
+  public void close() {
+    queryManager.markEndTime();
+
+    switch (state) {
+      case FAILED:
+        failedQueries.inc();
+        break;
+      case CANCELED:
+        canceledQueries.inc();
+        break;
+      case COMPLETED:
+        succeededQueries.inc();
+        break;
+    }
+
+    runningQueries.dec();
+    completedQueries.inc();
+  }
+
+
+  private void preparing(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case PLANNING:
+        queryManager.markStartTime();
+        runningQueries.inc();
+
+        recordNewState(newState);
+        planningQueries.inc();
+        return;
+      case CANCELLATION_REQUESTED:
+        wrapUpCancellation();
+        return;
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void planning(final QueryState newState, final Exception exception) {
+    planningQueries.dec();
+    queryManager.markPlanningEndTime();
+    switch (newState) {
+      case ENQUEUED:
+        recordNewState(newState);
+        enqueuedQueries.inc();
+        return;
+      case CANCELLATION_REQUESTED:
+        wrapUpCancellation();
+        return;
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void enqueued(final QueryState newState, final Exception exception) {
+    enqueuedQueries.dec();
+    queryManager.markQueueWaitEndTime();
+    switch (newState) {
+      case STARTING:
+        recordNewState(newState);
+        return;
+      case CANCELLATION_REQUESTED:
+        wrapUpCancellation();
+        return;
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void starting(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case RUNNING:
+        recordNewState(QueryState.RUNNING);
+        return;
+      case COMPLETED:
+        wrapUpCompletion();
+      case CANCELLATION_REQUESTED:
+        // since during starting state fragments are sent to the remote nodes,
+        // we don't want to cancel until they all are sent out
+        addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+        return;
+    }
+
+    checkCommonStates(newState, exception);
+  }
+
+  private void running(final QueryState newState, final Exception exception) {
+      /*
+       * For cases that cancel executing fragments, we have to record the new
+       * state first, because the cancellation of the local root fragment will
+       * cause this to be called recursively.
+       */
+    switch (newState) {
+      case CANCELLATION_REQUESTED: {
+        assert exception == null;
+        recordNewState(QueryState.CANCELLATION_REQUESTED);
+        queryManager.cancelExecutingFragments(drillbitContext);
+        foremanResult.setCompleted(QueryState.CANCELED);
+        /*
+         * We don't close the foremanResult until we've gotten
+         * acknowledgments, which happens below in the case for current state
+         * == CANCELLATION_REQUESTED.
+         */
+        return;
+      }
+
+      case COMPLETED: {
+        wrapUpCompletion();
+        return;
+      }
+    }
+    checkCommonStates(newState, exception);
+  }
+
+  private void cancellationRequested(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case FAILED:
+        if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+          assert exception != null;
+          recordNewState(QueryState.FAILED);
+          foremanResult.setForceFailure(exception);
+        }
+
+        // proceed
+
+      case CANCELED:
+      case COMPLETED:
+        /*
+         * These amount to a completion of the cancellation requests' cleanup;
+         * now we can clean up and send the result.
+         */
+        foremanResult.close();
+        return;
+    }
+
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+  }
+
+  private void wrapUpCancellation() {
+    recordNewState(QueryState.CANCELLATION_REQUESTED);
+    foremanResult.setCompleted(QueryState.CANCELED);
+  }
+
+  private void wrapUpCompletion() {
+    recordNewState(QueryState.COMPLETED);
+    foremanResult.setCompleted(QueryState.COMPLETED);
+    foremanResult.close();
+  }
+
+  private void checkCommonStates(final QueryState newState, final Exception exception) {
+    switch (newState) {
+      case FAILED:
+        assert exception != null;
+        recordNewState(QueryState.FAILED);
+        queryManager.cancelExecutingFragments(drillbitContext);
+        foremanResult.setFailed(exception);
+        foremanResult.close();
+        return;
+    }
+
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+  }
+
+  private class StateEvent {
+    final QueryState newState;
+    final Exception exception;
+
+    StateEvent(final QueryState newState, final Exception exception) {
+      this.newState = newState;
+      this.exception = exception;
+    }
+  }
+
+  private class StateSwitch extends EventProcessor<StateEvent> {
+    public void addEvent(final QueryState newState, final Exception exception) {
+      sendEvent(new StateEvent(newState, exception));
+    }
+
+    @Override
+    protected void processEvent(final StateEvent event) {
+      moveToState(event.newState, event.exception);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index a0cf643..cb66ca3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -306,7 +306,7 @@ drill.exec: {
       size: 2,
       // Maximum wait time in the queue before the query times out and
       // fails.
-      timeout: 5000 // 5 seconds
+      timeout_ms: 5000 // 5 seconds
     }
   }
   memory: {

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 956cfc4..ec101d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.math3.util.Pair;
+import org.apache.drill.exec.work.foreman.FragmentsRunner;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.QueryTestUtil;
 import org.apache.drill.SingleRowListener;
@@ -757,7 +758,7 @@ public class TestDrillbitResilience extends DrillTest {
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
     final String controls = Controls.newBuilder()
-    .addException(Foreman.class, exceptionDesc, exceptionClass)
+    .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
       .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 51cdab7..edc401c 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -10377,6 +10377,22 @@ public final class UserBitShared {
        * </pre>
        */
       ENQUEUED(6, 6),
+      /**
+       * <code>PREPARING = 7;</code>
+       *
+       * <pre>
+       * query is at preparation stage, foreman is initializing
+       * </pre>
+       */
+      PREPARING(7, 7),
+      /**
+       * <code>PLANNING = 8;</code>
+       *
+       * <pre>
+       * query is at planning stage (includes logical or / and physical planning)
+       * </pre>
+       */
+      PLANNING(8, 8),
       ;
 
       /**
@@ -10427,6 +10443,22 @@ public final class UserBitShared {
        * </pre>
        */
       public static final int ENQUEUED_VALUE = 6;
+      /**
+       * <code>PREPARING = 7;</code>
+       *
+       * <pre>
+       * query is at preparation stage, foreman is initializing
+       * </pre>
+       */
+      public static final int PREPARING_VALUE = 7;
+      /**
+       * <code>PLANNING = 8;</code>
+       *
+       * <pre>
+       * query is at planning stage (includes logical or / and physical planning)
+       * </pre>
+       */
+      public static final int PLANNING_VALUE = 8;
 
 
       public final int getNumber() { return value; }
@@ -10440,6 +10472,8 @@ public final class UserBitShared {
           case 4: return FAILED;
           case 5: return CANCELLATION_REQUESTED;
           case 6: return ENQUEUED;
+          case 7: return PREPARING;
+          case 8: return PLANNING;
           default: return null;
         }
       }
@@ -23942,92 +23976,93 @@ public final class UserBitShared {
       "ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" +
       "ngth\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNod" +
       "eStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footp" +
-      "rint\030\002 \001(\003\"\225\002\n\013QueryResult\0228\n\013query_stat" +
+      "rint\030\002 \001(\003\"\263\002\n\013QueryResult\0228\n\013query_stat" +
       "e\030\001 \001(\0162#.exec.shared.QueryResult.QueryS",
       "tate\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Que" +
       "ryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillP" +
-      "BError\"z\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RU" +
-      "NNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006" +
-      "FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010" +
-      "ENQUEUED\020\006\"p\n\tQueryData\022&\n\010query_id\030\001 \001(" +
-      "\0132\024.exec.shared.QueryId\022\021\n\trow_count\030\002 \001" +
-      "(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.RecordBatc" +
-      "hDef\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005sta" +
-      "rt\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.Qu",
-      "eryResult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n" +
-      "\007foreman\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024" +
-      "\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001" +
-      "\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile" +
-      "\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004ty" +
-      "pe\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n\005star" +
-      "t\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004p" +
-      "lan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.Drillb" +
-      "itEndpoint\0222\n\005state\030\010 \001(\0162#.exec.shared." +
-      "QueryResult.QueryState\022\027\n\017total_fragment",
-      "s\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020f" +
-      "ragment_profile\030\013 \003(\0132!.exec.shared.Majo" +
-      "rFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005err" +
-      "or\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_" +
-      "id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_" +
-      "json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWait" +
-      "End\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_n" +
-      "ame\030\025 \001(\t:\001-\"t\n\024MajorFragmentProfile\022\031\n\021" +
-      "major_fragment_id\030\001 \001(\005\022A\n\026minor_fragmen" +
-      "t_profile\030\002 \003(\0132!.exec.shared.MinorFragm",
-      "entProfile\"\350\002\n\024MinorFragmentProfile\022)\n\005s" +
-      "tate\030\001 \001(\0162\032.exec.shared.FragmentState\022(" +
-      "\n\005error\030\002 \001(\0132\031.exec.shared.DrillPBError" +
-      "\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020operator_" +
-      "profile\030\004 \003(\0132\034.exec.shared.OperatorProf" +
-      "ile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(" +
-      "\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_use" +
-      "d\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbi" +
-      "tEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_p" +
-      "rogress\030\013 \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinp",
-      "ut_profile\030\001 \003(\0132\032.exec.shared.StreamPro" +
-      "file\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_ty" +
-      "pe\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess" +
-      "_nanos\030\006 \001(\003\022#\n\033peak_local_memory_alloca" +
-      "ted\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.shared." +
-      "MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStrea" +
-      "mProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001" +
-      "(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tme" +
-      "tric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014dou" +
-      "ble_value\030\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\013",
-      "2\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022" +
-      "\032\n\022function_signature\030\002 \003(\t\"W\n\013SaslMessa" +
-      "ge\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006s" +
-      "tatus\030\003 \001(\0162\027.exec.shared.SaslStatus*5\n\n" +
-      "RpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020" +
-      "\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOG" +
-      "ICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022P" +
-      "REPARED_STATEMENT\020\005*\207\001\n\rFragmentState\022\013\n" +
-      "\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007R" +
-      "UNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n",
-      "\006FAILED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\360\005" +
-      "\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n" +
-      "\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_" +
-      "AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN" +
-      "\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007" +
-      "\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTIT" +
-      "ION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_R" +
-      "ECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022" +
-      "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" +
-      "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN",
-      "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" +
-      "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" +
-      "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" +
-      "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" +
-      "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" +
-      "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" +
-      "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" +
-      "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" +
-      "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" +
-      "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSasl",
-      "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" +
-      "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" +
-      "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" +
-      "ec.protoB\rUserBitSharedH\001"
+      "BError\"\227\001\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007R" +
+      "UNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n" +
+      "\006FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n" +
+      "\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007\022\014\n\010PLANNING\020\010" +
+      "\"p\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.s" +
+      "hared.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030" +
+      "\003 \001(\0132\033.exec.shared.RecordBatchDef\"\330\001\n\tQ" +
+      "ueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222",
+      "\n\005state\030\003 \001(\0162#.exec.shared.QueryResult." +
+      "QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005" +
+      " \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014options_" +
+      "json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqueue_" +
+      "name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile\022 \n\002id\030\001 \001" +
+      "(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001(\0162\026" +
+      ".exec.shared.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n" +
+      "\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022" +
+      "\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndpoint" +
+      "\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryResul",
+      "t.QueryState\022\027\n\017total_fragments\030\t \001(\005\022\032\n" +
+      "\022finished_fragments\030\n \001(\005\022;\n\020fragment_pr" +
+      "ofile\030\013 \003(\0132!.exec.shared.MajorFragmentP" +
+      "rofile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024" +
+      "\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022" +
+      "\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021 \001(\t" +
+      "\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022" +
+      "\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 \001(\t:" +
+      "\001-\"t\n\024MajorFragmentProfile\022\031\n\021major_frag" +
+      "ment_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030",
+      "\002 \003(\0132!.exec.shared.MinorFragmentProfile" +
+      "\"\350\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\016" +
+      "2\032.exec.shared.FragmentState\022(\n\005error\030\002 " +
+      "\001(\0132\031.exec.shared.DrillPBError\022\031\n\021minor_" +
+      "fragment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 " +
+      "\003(\0132\034.exec.shared.OperatorProfile\022\022\n\nsta" +
+      "rt_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memor" +
+      "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" +
+      "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" +
+      "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 ",
+      "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" +
+      "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" +
+      "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" +
+      "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" +
+      "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" +
+      "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" +
+      "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" +
+      "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" +
+      "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " +
+      "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030",
+      "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" +
+      "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" +
+      "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" +
+      "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" +
+      "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" +
+      "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" +
+      "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" +
+      "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" +
+      "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
+      "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014",
+      "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
+      "\032\n\026CANCELLATION_REQUESTED\020\006*\360\005\n\020CoreOper" +
+      "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
+      "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
+      "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
+      "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" +
+      "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" +
+      "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" +
+      "\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTI" +
+      "ON_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGA",
+      "TE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022" +
+      "\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026P" +
+      "ARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN" +
+      "\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SC" +
+      "AN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_S" +
+      "CAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020" +
+      "\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_" +
+      "SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_" +
+      "CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW" +
+      "\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCA",
+      "N\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSaslStatus\022\020\n\014" +
+      "SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_I" +
+      "N_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_F" +
+      "AILED\020\004B.\n\033org.apache.drill.exec.protoB\r" +
+      "UserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
index 7b2a273..a53dc42 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
@@ -43,7 +43,9 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
         CANCELED(3),
         FAILED(4),
         CANCELLATION_REQUESTED(5),
-        ENQUEUED(6);
+        ENQUEUED(6),
+        PREPARING(7),
+        PLANNING(8);
         
         public final int number;
         
@@ -68,6 +70,8 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
                 case 4: return FAILED;
                 case 5: return CANCELLATION_REQUESTED;
                 case 6: return ENQUEUED;
+                case 7: return PREPARING;
+                case 8: return PLANNING;
                 default: return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 086b98a..205611b 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -167,6 +167,8 @@ message QueryResult {
 	  FAILED = 4;
 	  CANCELLATION_REQUESTED = 5; // cancellation has been requested, and is being processed
 	  ENQUEUED = 6; // query has been enqueued. this is pre-starting.
+	  PREPARING = 7; // query is at preparation stage, foreman is initializing
+	  PLANNING = 8; // query is at planning stage (includes logical or / and physical planning)
 	}
 
 	optional QueryState query_state = 1;