You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/01/02 14:50:15 UTC
[3/7] drill git commit: DRILL-5963: Query state process improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
new file mode 100644
index 0000000..2443139
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import com.codahale.metrics.Counter;
+import org.apache.drill.common.EventProcessor;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.foreman.Foreman.ForemanResult;
+
+/**
+ * Is responsible for query transition from one state to another,
+ * incrementing / decrementing query statuses counters.
+ */
+public class QueryStateProcessor implements AutoCloseable {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStateProcessor.class);
+
+ private static final Counter planningQueries = DrillMetrics.getRegistry().counter("drill.queries.planning");
+ private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued");
+ private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running");
+ private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed");
+ private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded");
+ private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed");
+ private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled");
+
+ private final StateSwitch stateSwitch = new StateSwitch();
+
+ private final String queryIdString;
+ private final QueryManager queryManager;
+ private final DrillbitContext drillbitContext;
+ private final ForemanResult foremanResult;
+
+ private volatile QueryState state;
+
+ public QueryStateProcessor(String queryIdString, QueryManager queryManager, DrillbitContext drillbitContext, ForemanResult foremanResult) {
+ this.queryIdString = queryIdString;
+ this.queryManager = queryManager;
+ this.drillbitContext = drillbitContext;
+ this.foremanResult = foremanResult;
+ // initial query state is PREPARING
+ this.state = QueryState.PREPARING;
+ }
+
+ /**
+ * @return current query state
+ */
+ public QueryState getState() {
+ return state;
+ }
+
+ /**
+ * Moves one query state to another, will fail when requested query state transition is not allowed.
+ *
+ * @param newState new query state
+ * @param exception exception if failure occurred
+ */
+ public synchronized void moveToState(QueryState newState, Exception exception) {
+ logger.debug(queryIdString + ": State change requested {} --> {}", state, newState);
+
+ switch (state) {
+ case PREPARING:
+ preparing(newState, exception);
+ return;
+ case PLANNING:
+ planning(newState, exception);
+ return;
+ case ENQUEUED:
+ enqueued(newState, exception);
+ return;
+ case STARTING:
+ starting(newState, exception);
+ return;
+ case RUNNING:
+ running(newState, exception);
+ return;
+ case CANCELLATION_REQUESTED:
+ cancellationRequested(newState, exception);
+ return;
+ case CANCELED:
+ case COMPLETED:
+ case FAILED:
+ logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state);
+ return;
+ }
+
+ throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+ }
+
+ /**
+ * Directly moves query from one state to another and updates ephemeral query store.
+ *
+ * @param newState new query state
+ */
+ public void recordNewState(final QueryState newState) {
+ state = newState;
+ queryManager.updateEphemeralState(newState);
+ }
+
+ /**
+ * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated.
+ * For preparing, planning and enqueued states we cancel immediately since these states are done locally.
+ *
+ * Note this can be called from outside of run() on another thread, or after run() completes
+ */
+ public void cancel() {
+ switch (state) {
+ case PREPARING:
+ case PLANNING:
+ case ENQUEUED:
+ moveToState(QueryState.CANCELLATION_REQUESTED, null);
+ return;
+
+ case STARTING:
+ case RUNNING:
+ addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+ return;
+
+ case CANCELLATION_REQUESTED:
+ case CANCELED:
+ case COMPLETED:
+ case FAILED:
+ // nothing to do
+ return;
+
+ default:
+ throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state);
+ }
+ }
+
+ /**
+ * Tells the foreman to move to a new state.<br>
+ * This will be added to the end of the event queue and will be processed once the foreman is ready
+ * to accept external events.
+ *
+ * @param newState the state to move to
+ * @param exception if not null, the exception that drove this state transition (usually a failure)
+ */
+ public void addToEventQueue(final QueryState newState, final Exception exception) {
+ stateSwitch.addEvent(newState, exception);
+ }
+
+ /**
+ * Starts processing all events that were enqueued while all fragments were sending out.
+ */
+ public void startProcessingEvents() {
+ try {
+ stateSwitch.start();
+ } catch (Exception ex) {
+ moveToState(QueryState.FAILED, ex);
+ }
+ }
+
+ /**
+ * On close set proper increment / decrement counters depending on final query state.
+ */
+ @Override
+ public void close() {
+ queryManager.markEndTime();
+
+ switch (state) {
+ case FAILED:
+ failedQueries.inc();
+ break;
+ case CANCELED:
+ canceledQueries.inc();
+ break;
+ case COMPLETED:
+ succeededQueries.inc();
+ break;
+ }
+
+ runningQueries.dec();
+ completedQueries.inc();
+ }
+
+
+ private void preparing(final QueryState newState, final Exception exception) {
+ switch (newState) {
+ case PLANNING:
+ queryManager.markStartTime();
+ runningQueries.inc();
+
+ recordNewState(newState);
+ planningQueries.inc();
+ return;
+ case CANCELLATION_REQUESTED:
+ wrapUpCancellation();
+ return;
+ }
+ checkCommonStates(newState, exception);
+ }
+
+ private void planning(final QueryState newState, final Exception exception) {
+ planningQueries.dec();
+ queryManager.markPlanningEndTime();
+ switch (newState) {
+ case ENQUEUED:
+ recordNewState(newState);
+ enqueuedQueries.inc();
+ return;
+ case CANCELLATION_REQUESTED:
+ wrapUpCancellation();
+ return;
+ }
+ checkCommonStates(newState, exception);
+ }
+
+ private void enqueued(final QueryState newState, final Exception exception) {
+ enqueuedQueries.dec();
+ queryManager.markQueueWaitEndTime();
+ switch (newState) {
+ case STARTING:
+ recordNewState(newState);
+ return;
+ case CANCELLATION_REQUESTED:
+ wrapUpCancellation();
+ return;
+ }
+ checkCommonStates(newState, exception);
+ }
+
+ private void starting(final QueryState newState, final Exception exception) {
+ switch (newState) {
+ case RUNNING:
+ recordNewState(QueryState.RUNNING);
+ return;
+ case COMPLETED:
+ wrapUpCompletion();
+ case CANCELLATION_REQUESTED:
+ // since during starting state fragments are sent to the remote nodes,
+ // we don't want to cancel until they all are sent out
+ addToEventQueue(QueryState.CANCELLATION_REQUESTED, null);
+ return;
+ }
+
+ checkCommonStates(newState, exception);
+ }
+
+ private void running(final QueryState newState, final Exception exception) {
+ /*
+ * For cases that cancel executing fragments, we have to record the new
+ * state first, because the cancellation of the local root fragment will
+ * cause this to be called recursively.
+ */
+ switch (newState) {
+ case CANCELLATION_REQUESTED: {
+ assert exception == null;
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext);
+ foremanResult.setCompleted(QueryState.CANCELED);
+ /*
+ * We don't close the foremanResult until we've gotten
+ * acknowledgments, which happens below in the case for current state
+ * == CANCELLATION_REQUESTED.
+ */
+ return;
+ }
+
+ case COMPLETED: {
+ wrapUpCompletion();
+ return;
+ }
+ }
+ checkCommonStates(newState, exception);
+ }
+
+ private void cancellationRequested(final QueryState newState, final Exception exception) {
+ switch (newState) {
+ case FAILED:
+ if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) {
+ assert exception != null;
+ recordNewState(QueryState.FAILED);
+ foremanResult.setForceFailure(exception);
+ }
+
+ // proceed
+
+ case CANCELED:
+ case COMPLETED:
+ /*
+ * These amount to a completion of the cancellation requests' cleanup;
+ * now we can clean up and send the result.
+ */
+ foremanResult.close();
+ return;
+ }
+
+ throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+ }
+
+ private void wrapUpCancellation() {
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ foremanResult.setCompleted(QueryState.CANCELED);
+ }
+
+ private void wrapUpCompletion() {
+ recordNewState(QueryState.COMPLETED);
+ foremanResult.setCompleted(QueryState.COMPLETED);
+ foremanResult.close();
+ }
+
+ private void checkCommonStates(final QueryState newState, final Exception exception) {
+ switch (newState) {
+ case FAILED:
+ assert exception != null;
+ recordNewState(QueryState.FAILED);
+ queryManager.cancelExecutingFragments(drillbitContext);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ }
+
+ throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
+ }
+
+ private class StateEvent {
+ final QueryState newState;
+ final Exception exception;
+
+ StateEvent(final QueryState newState, final Exception exception) {
+ this.newState = newState;
+ this.exception = exception;
+ }
+ }
+
+ private class StateSwitch extends EventProcessor<StateEvent> {
+ public void addEvent(final QueryState newState, final Exception exception) {
+ sendEvent(new StateEvent(newState, exception));
+ }
+
+ @Override
+ protected void processEvent(final StateEvent event) {
+ moveToState(event.newState, event.exception);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index a0cf643..cb66ca3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -306,7 +306,7 @@ drill.exec: {
size: 2,
// Maximum wait time in the queue before the query times out and
// fails.
- timeout: 5000 // 5 seconds
+ timeout_ms: 5000 // 5 seconds
}
}
memory: {
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 956cfc4..ec101d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.math3.util.Pair;
+import org.apache.drill.exec.work.foreman.FragmentsRunner;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.QueryTestUtil;
import org.apache.drill.SingleRowListener;
@@ -757,7 +758,7 @@ public class TestDrillbitResilience extends DrillTest {
final String exceptionDesc = "send-fragments";
final Class<? extends Throwable> exceptionClass = ForemanException.class;
final String controls = Controls.newBuilder()
- .addException(Foreman.class, exceptionDesc, exceptionClass)
+ .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
.build();
assertFailsWithException(controls, exceptionClass, exceptionDesc);
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 51cdab7..edc401c 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -10377,6 +10377,22 @@ public final class UserBitShared {
* </pre>
*/
ENQUEUED(6, 6),
+ /**
+ * <code>PREPARING = 7;</code>
+ *
+ * <pre>
+ * query is at preparation stage, foreman is initializing
+ * </pre>
+ */
+ PREPARING(7, 7),
+ /**
+ * <code>PLANNING = 8;</code>
+ *
+ * <pre>
+ * query is at planning stage (includes logical or / and physical planning)
+ * </pre>
+ */
+ PLANNING(8, 8),
;
/**
@@ -10427,6 +10443,22 @@ public final class UserBitShared {
* </pre>
*/
public static final int ENQUEUED_VALUE = 6;
+ /**
+ * <code>PREPARING = 7;</code>
+ *
+ * <pre>
+ * query is at preparation stage, foreman is initializing
+ * </pre>
+ */
+ public static final int PREPARING_VALUE = 7;
+ /**
+ * <code>PLANNING = 8;</code>
+ *
+ * <pre>
+ * query is at planning stage (includes logical or / and physical planning)
+ * </pre>
+ */
+ public static final int PLANNING_VALUE = 8;
public final int getNumber() { return value; }
@@ -10440,6 +10472,8 @@ public final class UserBitShared {
case 4: return FAILED;
case 5: return CANCELLATION_REQUESTED;
case 6: return ENQUEUED;
+ case 7: return PREPARING;
+ case 8: return PLANNING;
default: return null;
}
}
@@ -23942,92 +23976,93 @@ public final class UserBitShared {
"ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" +
"ngth\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNod" +
"eStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footp" +
- "rint\030\002 \001(\003\"\225\002\n\013QueryResult\0228\n\013query_stat" +
+ "rint\030\002 \001(\003\"\263\002\n\013QueryResult\0228\n\013query_stat" +
"e\030\001 \001(\0162#.exec.shared.QueryResult.QueryS",
"tate\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Que" +
"ryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillP" +
- "BError\"z\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RU" +
- "NNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006" +
- "FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010" +
- "ENQUEUED\020\006\"p\n\tQueryData\022&\n\010query_id\030\001 \001(" +
- "\0132\024.exec.shared.QueryId\022\021\n\trow_count\030\002 \001" +
- "(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.RecordBatc" +
- "hDef\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005sta" +
- "rt\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.Qu",
- "eryResult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n" +
- "\007foreman\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024" +
- "\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001" +
- "\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile" +
- "\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004ty" +
- "pe\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n\005star" +
- "t\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004p" +
- "lan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.Drillb" +
- "itEndpoint\0222\n\005state\030\010 \001(\0162#.exec.shared." +
- "QueryResult.QueryState\022\027\n\017total_fragment",
- "s\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020f" +
- "ragment_profile\030\013 \003(\0132!.exec.shared.Majo" +
- "rFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005err" +
- "or\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_" +
- "id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_" +
- "json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWait" +
- "End\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_n" +
- "ame\030\025 \001(\t:\001-\"t\n\024MajorFragmentProfile\022\031\n\021" +
- "major_fragment_id\030\001 \001(\005\022A\n\026minor_fragmen" +
- "t_profile\030\002 \003(\0132!.exec.shared.MinorFragm",
- "entProfile\"\350\002\n\024MinorFragmentProfile\022)\n\005s" +
- "tate\030\001 \001(\0162\032.exec.shared.FragmentState\022(" +
- "\n\005error\030\002 \001(\0132\031.exec.shared.DrillPBError" +
- "\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020operator_" +
- "profile\030\004 \003(\0132\034.exec.shared.OperatorProf" +
- "ile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(" +
- "\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_use" +
- "d\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbi" +
- "tEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_p" +
- "rogress\030\013 \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinp",
- "ut_profile\030\001 \003(\0132\032.exec.shared.StreamPro" +
- "file\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_ty" +
- "pe\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess" +
- "_nanos\030\006 \001(\003\022#\n\033peak_local_memory_alloca" +
- "ted\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.shared." +
- "MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStrea" +
- "mProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001" +
- "(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tme" +
- "tric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014dou" +
- "ble_value\030\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\013",
- "2\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022" +
- "\032\n\022function_signature\030\002 \003(\t\"W\n\013SaslMessa" +
- "ge\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006s" +
- "tatus\030\003 \001(\0162\027.exec.shared.SaslStatus*5\n\n" +
- "RpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020" +
- "\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOG" +
- "ICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022P" +
- "REPARED_STATEMENT\020\005*\207\001\n\rFragmentState\022\013\n" +
- "\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007R" +
- "UNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n",
- "\006FAILED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\360\005" +
- "\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n" +
- "\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_" +
- "AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN" +
- "\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007" +
- "\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTIT" +
- "ION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_R" +
- "ECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022" +
- "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" +
- "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN",
- "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" +
- "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" +
- "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" +
- "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" +
- "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" +
- "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" +
- "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" +
- "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" +
- "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" +
- "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSasl",
- "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" +
- "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" +
- "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" +
- "ec.protoB\rUserBitSharedH\001"
+ "BError\"\227\001\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007R" +
+ "UNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n" +
+ "\006FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n" +
+ "\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007\022\014\n\010PLANNING\020\010" +
+ "\"p\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.s" +
+ "hared.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030" +
+ "\003 \001(\0132\033.exec.shared.RecordBatchDef\"\330\001\n\tQ" +
+ "ueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222",
+ "\n\005state\030\003 \001(\0162#.exec.shared.QueryResult." +
+ "QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005" +
+ " \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014options_" +
+ "json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqueue_" +
+ "name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile\022 \n\002id\030\001 \001" +
+ "(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001(\0162\026" +
+ ".exec.shared.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n" +
+ "\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022" +
+ "\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndpoint" +
+ "\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryResul",
+ "t.QueryState\022\027\n\017total_fragments\030\t \001(\005\022\032\n" +
+ "\022finished_fragments\030\n \001(\005\022;\n\020fragment_pr" +
+ "ofile\030\013 \003(\0132!.exec.shared.MajorFragmentP" +
+ "rofile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024" +
+ "\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022" +
+ "\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021 \001(\t" +
+ "\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022" +
+ "\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 \001(\t:" +
+ "\001-\"t\n\024MajorFragmentProfile\022\031\n\021major_frag" +
+ "ment_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030",
+ "\002 \003(\0132!.exec.shared.MinorFragmentProfile" +
+ "\"\350\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\016" +
+ "2\032.exec.shared.FragmentState\022(\n\005error\030\002 " +
+ "\001(\0132\031.exec.shared.DrillPBError\022\031\n\021minor_" +
+ "fragment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 " +
+ "\003(\0132\034.exec.shared.OperatorProfile\022\022\n\nsta" +
+ "rt_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memor" +
+ "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" +
+ "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" +
+ "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 ",
+ "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" +
+ "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" +
+ "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" +
+ "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" +
+ "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" +
+ "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" +
+ "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" +
+ "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" +
+ "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " +
+ "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030",
+ "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" +
+ "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" +
+ "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" +
+ "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" +
+ "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" +
+ "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" +
+ "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" +
+ "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" +
+ "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" +
+ "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014",
+ "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" +
+ "\032\n\026CANCELLATION_REQUESTED\020\006*\360\005\n\020CoreOper" +
+ "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" +
+ "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" +
+ "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" +
+ "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" +
+ "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" +
+ "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" +
+ "\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTI" +
+ "ON_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGA",
+ "TE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022" +
+ "\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026P" +
+ "ARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN" +
+ "\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SC" +
+ "AN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_S" +
+ "CAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020" +
+ "\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_" +
+ "SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_" +
+ "CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW" +
+ "\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCA",
+ "N\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSaslStatus\022\020\n\014" +
+ "SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_I" +
+ "N_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_F" +
+ "AILED\020\004B.\n\033org.apache.drill.exec.protoB\r" +
+ "UserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
index 7b2a273..a53dc42 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
@@ -43,7 +43,9 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
CANCELED(3),
FAILED(4),
CANCELLATION_REQUESTED(5),
- ENQUEUED(6);
+ ENQUEUED(6),
+ PREPARING(7),
+ PLANNING(8);
public final int number;
@@ -68,6 +70,8 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
case 4: return FAILED;
case 5: return CANCELLATION_REQUESTED;
case 6: return ENQUEUED;
+ case 7: return PREPARING;
+ case 8: return PLANNING;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 086b98a..205611b 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -167,6 +167,8 @@ message QueryResult {
FAILED = 4;
CANCELLATION_REQUESTED = 5; // cancellation has been requested, and is being processed
ENQUEUED = 6; // query has been enqueued. this is pre-starting.
+ PREPARING = 7; // query is at preparation stage, foreman is initializing
+ PLANNING = 8; // query is at planning stage (includes logical or / and physical planning)
}
optional QueryState query_state = 1;