You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by hg...@apache.org on 2015/12/23 01:21:22 UTC
drill git commit: DRILL-4187: introduce a new query state ENQUEUED
and rename the state PENDING to STARTING
Repository: drill
Updated Branches:
refs/heads/master ed0369b0a -> de008810c
DRILL-4187: introduce a new query state ENQUEUED and rename the state PENDING to STARTING
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/de008810
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/de008810
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/de008810
Branch: refs/heads/master
Commit: de008810c815e46e6f6e5d13ad0b9a23e705b13a
Parents: ed0369b
Author: Hanifi Gunes <ha...@gmail.com>
Authored: Tue Dec 15 17:24:27 2015 -0800
Committer: Hanifi Gunes <ha...@gmail.com>
Committed: Tue Dec 22 16:24:07 2015 -0800
----------------------------------------------------------------------
.../drill/exec/rpc/user/QueryResultHandler.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 102 ++++++----
.../drill/exec/work/foreman/QueryManager.java | 3 +-
.../apache/drill/exec/proto/UserBitShared.java | 199 +++++++++++--------
.../drill/exec/proto/beans/QueryInfo.java | 2 +-
.../drill/exec/proto/beans/QueryProfile.java | 2 +-
.../drill/exec/proto/beans/QueryResult.java | 10 +-
protocol/src/main/protobuf/UserBitShared.proto | 3 +-
8 files changed, 184 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 16d957b..ca73ac8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -92,7 +92,7 @@ public class QueryResultHandler {
// CANCELED queries are handled the same way as COMPLETED
final boolean isTerminalResult;
switch ( queryState ) {
- case PENDING:
+ case STARTING:
isTerminalResult = false;
break;
case FAILED:
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 7b015a5..da775cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -135,6 +135,8 @@ public class Foreman implements Runnable {
private final ForemanResult foremanResult = new ForemanResult();
private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
private final ChannelFuture closeFuture;
+ private final boolean queuingEnabled;
+
private String queryText;
@@ -163,7 +165,11 @@ public class Foreman implements Runnable {
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
- recordNewState(QueryState.PENDING);
+ final OptionManager optionManager = queryContext.getOptions();
+ queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
+
+ final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING;
+ recordNewState(initialState);
}
private class ConnectionClosedListener implements GenericFutureListener<Future<Void>> {
@@ -391,7 +397,10 @@ public class Foreman implements Runnable {
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
setupSortMemoryAllocations(plan);
- acquireQuerySemaphore(plan);
+ if (queuingEnabled) {
+ acquireQuerySemaphore(plan);
+ moveToState(QueryState.STARTING, null);
+ }
final QueryWorkUnit work = getQueryWorkUnit(plan);
final List<PlanFragment> planFragments = work.getFragments();
@@ -457,49 +466,45 @@ public class Foreman implements Runnable {
*/
private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
final OptionManager optionManager = queryContext.getOptions();
- final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
- if (queuingEnabled) {
- final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
- double totalCost = 0;
- for (final PhysicalOperator ops : plan.getSortedOperators()) {
- totalCost += ops.getCost();
- }
-
- final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
- final String queueName;
-
- try {
- @SuppressWarnings("resource")
- final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
- final DistributedSemaphore distributedSemaphore;
-
- // get the appropriate semaphore
- if (totalCost > queueThreshold) {
- final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
- distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
- queueName = "large";
- } else {
- final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
- distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
- queueName = "small";
- }
+ final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
+ double totalCost = 0;
+ for (final PhysicalOperator ops : plan.getSortedOperators()) {
+ totalCost += ops.getCost();
+ }
+ final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+ final String queueName;
- lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- throw new ForemanSetupException("Unable to acquire slot for query.", e);
+ try {
+ @SuppressWarnings("resource")
+ final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
+ final DistributedSemaphore distributedSemaphore;
+
+ // get the appropriate semaphore
+ if (totalCost > queueThreshold) {
+ final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
+ queueName = "large";
+ } else {
+ final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
+ queueName = "small";
}
- if (lease == null) {
- throw UserException
- .resourceError()
- .message(
- "Unable to acquire queue resources for query within timeout. Timeout for %s queue was set at %d seconds.",
- queueName, queueTimeout / 1000)
- .build(logger);
- }
+ lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ throw new ForemanSetupException("Unable to acquire slot for query.", e);
+ }
+ if (lease == null) {
+ throw UserException
+ .resourceError()
+ .message(
+ "Unable to acquire queue resources for query within timeout. Timeout for %s queue was set at %d seconds.",
+ queueName, queueTimeout / 1000)
+ .build(logger);
}
+
}
Exception getCurrentException() {
@@ -796,7 +801,20 @@ public class Foreman implements Runnable {
logger.debug(queryIdString + ": State change requested {} --> {}", state, newState,
exception);
switch (state) {
- case PENDING:
+ case ENQUEUED:
+ switch (newState) {
+ case FAILED:
+ Preconditions.checkNotNull(exception, "exception cannot be null when new state is failed");
+ recordNewState(newState);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ case STARTING:
+ recordNewState(state);
+ return;
+ }
+ break;
+ case STARTING:
if (newState == QueryState.RUNNING) {
recordNewState(QueryState.RUNNING);
return;
@@ -841,10 +859,8 @@ public class Foreman implements Runnable {
return;
}
- default:
- throw new IllegalStateException("illegal transition from RUNNING to "
- + newState);
}
+ break;
}
case CANCELLATION_REQUESTED:
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 60173e2..b07c86b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -277,7 +277,8 @@ public class QueryManager {
QueryState updateEphemeralState(final QueryState queryState) {
switch (queryState) {
- case PENDING:
+ case ENQUEUED:
+ case STARTING:
case RUNNING:
case CANCELLATION_REQUESTED:
profileEStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index e76d748..b0489f7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -10169,9 +10169,13 @@ public final class UserBitShared {
public enum QueryState
implements com.google.protobuf.ProtocolMessageEnum {
/**
- * <code>PENDING = 0;</code>
+ * <code>STARTING = 0;</code>
+ *
+ * <pre>
+ * query has been scheduled for execution. This is post-enqueued.
+ * </pre>
*/
- PENDING(0, 0),
+ STARTING(0, 0),
/**
* <code>RUNNING = 1;</code>
*/
@@ -10204,12 +10208,24 @@ public final class UserBitShared {
* </pre>
*/
CANCELLATION_REQUESTED(5, 5),
+ /**
+ * <code>ENQUEUED = 6;</code>
+ *
+ * <pre>
+ * query has been enqueued. this is pre-starting.
+ * </pre>
+ */
+ ENQUEUED(6, 6),
;
/**
- * <code>PENDING = 0;</code>
+ * <code>STARTING = 0;</code>
+ *
+ * <pre>
+ * query has been scheduled for execution. This is post-enqueued.
+ * </pre>
*/
- public static final int PENDING_VALUE = 0;
+ public static final int STARTING_VALUE = 0;
/**
* <code>RUNNING = 1;</code>
*/
@@ -10242,18 +10258,27 @@ public final class UserBitShared {
* </pre>
*/
public static final int CANCELLATION_REQUESTED_VALUE = 5;
+ /**
+ * <code>ENQUEUED = 6;</code>
+ *
+ * <pre>
+ * query has been enqueued. this is pre-starting.
+ * </pre>
+ */
+ public static final int ENQUEUED_VALUE = 6;
public final int getNumber() { return value; }
public static QueryState valueOf(int value) {
switch (value) {
- case 0: return PENDING;
+ case 0: return STARTING;
case 1: return RUNNING;
case 2: return COMPLETED;
case 3: return CANCELED;
case 4: return FAILED;
case 5: return CANCELLATION_REQUESTED;
+ case 6: return ENQUEUED;
default: return null;
}
}
@@ -10381,7 +10406,7 @@ public final class UserBitShared {
}
private void initFields() {
- queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
error_ = java.util.Collections.emptyList();
}
@@ -10550,7 +10575,7 @@ public final class UserBitShared {
public Builder clear() {
super.clear();
- queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
bitField0_ = (bitField0_ & ~0x00000001);
if (queryIdBuilder_ == null) {
queryId_ = org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
@@ -10689,7 +10714,7 @@ public final class UserBitShared {
private int bitField0_;
// optional .exec.shared.QueryResult.QueryState query_state = 1;
- private org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ private org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
/**
* <code>optional .exec.shared.QueryResult.QueryState query_state = 1;</code>
*/
@@ -10719,7 +10744,7 @@ public final class UserBitShared {
*/
public Builder clearQueryState() {
bitField0_ = (bitField0_ & ~0x00000001);
- queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ queryState_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
onChanged();
return this;
}
@@ -12222,7 +12247,7 @@ public final class UserBitShared {
private void initFields() {
query_ = "";
start_ = 0L;
- state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
user_ = "-";
foreman_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.getDefaultInstance();
}
@@ -12403,7 +12428,7 @@ public final class UserBitShared {
bitField0_ = (bitField0_ & ~0x00000001);
start_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
- state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
bitField0_ = (bitField0_ & ~0x00000004);
user_ = "-";
bitField0_ = (bitField0_ & ~0x00000008);
@@ -12635,7 +12660,7 @@ public final class UserBitShared {
}
// optional .exec.shared.QueryResult.QueryState state = 3;
- private org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ private org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
/**
* <code>optional .exec.shared.QueryResult.QueryState state = 3;</code>
*/
@@ -12665,7 +12690,7 @@ public final class UserBitShared {
*/
public Builder clearState() {
bitField0_ = (bitField0_ & ~0x00000004);
- state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
onChanged();
return this;
}
@@ -13783,7 +13808,7 @@ public final class UserBitShared {
query_ = "";
plan_ = "";
foreman_ = org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.getDefaultInstance();
- state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
totalFragments_ = 0;
finishedFragments_ = 0;
fragmentProfile_ = java.util.Collections.emptyList();
@@ -14067,7 +14092,7 @@ public final class UserBitShared {
foremanBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000040);
- state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
bitField0_ = (bitField0_ & ~0x00000080);
totalFragments_ = 0;
bitField0_ = (bitField0_ & ~0x00000100);
@@ -14807,7 +14832,7 @@ public final class UserBitShared {
}
// optional .exec.shared.QueryResult.QueryState state = 8;
- private org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ private org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
/**
* <code>optional .exec.shared.QueryResult.QueryState state = 8;</code>
*/
@@ -14837,7 +14862,7 @@ public final class UserBitShared {
*/
public Builder clearState() {
bitField0_ = (bitField0_ & ~0x00000080);
- state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.PENDING;
+ state_ = org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState.STARTING;
onChanged();
return this;
}
@@ -20788,79 +20813,79 @@ public final class UserBitShared {
"ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" +
"ngth\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNod" +
"eStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footp" +
- "rint\030\002 \001(\003\"\206\002\n\013QueryResult\0228\n\013query_stat" +
+ "rint\030\002 \001(\003\"\225\002\n\013QueryResult\0228\n\013query_stat" +
"e\030\001 \001(\0162#.exec.shared.QueryResult.QueryS",
"tate\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Que" +
"ryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillP" +
- "BError\"k\n\nQueryState\022\013\n\007PENDING\020\000\022\013\n\007RUN" +
- "NING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006F" +
- "AILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\"p\n\tQ" +
- "ueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.shared" +
- ".QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030\003 \001(\013" +
- "2\033.exec.shared.RecordBatchDef\"\227\001\n\tQueryI" +
- "nfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222\n\005sta" +
- "te\030\003 \001(\0162#.exec.shared.QueryResult.Query",
- "State\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005 \001(\0132" +
- "\026.exec.DrillbitEndpoint\"\272\003\n\014QueryProfile" +
- "\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004ty" +
- "pe\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n\005star" +
- "t\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004p" +
- "lan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.Drillb" +
- "itEndpoint\0222\n\005state\030\010 \001(\0162#.exec.shared." +
- "QueryResult.QueryState\022\027\n\017total_fragment" +
- "s\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020f" +
- "ragment_profile\030\013 \003(\0132!.exec.shared.Majo",
- "rFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005err" +
- "or\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_" +
- "id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\"t\n\024MajorFra" +
- "gmentProfile\022\031\n\021major_fragment_id\030\001 \001(\005\022" +
- "A\n\026minor_fragment_profile\030\002 \003(\0132!.exec.s" +
- "hared.MinorFragmentProfile\"\350\002\n\024MinorFrag" +
- "mentProfile\022)\n\005state\030\001 \001(\0162\032.exec.shared" +
- ".FragmentState\022(\n\005error\030\002 \001(\0132\031.exec.sha" +
- "red.DrillPBError\022\031\n\021minor_fragment_id\030\003 " +
- "\001(\005\0226\n\020operator_profile\030\004 \003(\0132\034.exec.sha",
- "red.OperatorProfile\022\022\n\nstart_time\030\005 \001(\003\022" +
- "\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027" +
- "\n\017max_memory_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(" +
- "\0132\026.exec.DrillbitEndpoint\022\023\n\013last_update" +
- "\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\377\001\n\017Operat" +
- "orProfile\0221\n\rinput_profile\030\001 \003(\0132\032.exec." +
- "shared.StreamProfile\022\023\n\013operator_id\030\003 \001(" +
- "\005\022\025\n\roperator_type\030\004 \001(\005\022\023\n\013setup_nanos\030" +
- "\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033peak_loc" +
- "al_memory_allocated\030\007 \001(\003\022(\n\006metric\030\010 \003(",
- "\0132\030.exec.shared.MetricValue\022\022\n\nwait_nano" +
- "s\030\t \001(\003\"B\n\rStreamProfile\022\017\n\007records\030\001 \001(" +
- "\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013M" +
- "etricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_va" +
- "lue\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001*5\n\nRpcCh" +
- "annel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004" +
- "USER\020\002*/\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020" +
- "\002\022\014\n\010PHYSICAL\020\003*\207\001\n\rFragmentState\022\013\n\007SEN" +
- "DING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNI" +
- "NG\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAI",
- "LED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\335\005\n\020Co" +
- "reOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BRO" +
- "ADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGR" +
- "EGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031" +
- "\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020" +
- "MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_" +
- "SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEI" +
- "VER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030S" +
- "ELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_A" +
- "GGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_S",
- "ORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020" +
- "\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SU" +
- "B_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_" +
- "SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT" +
- "_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB" +
- "_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEM" +
- "A_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRO" +
- "DUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006" +
- "WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_S" +
- "UB_SCAN\020$B.\n\033org.apache.drill.exec.proto",
- "B\rUserBitSharedH\001"
+ "BError\"z\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RU" +
+ "NNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006" +
+ "FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010" +
+ "ENQUEUED\020\006\"p\n\tQueryData\022&\n\010query_id\030\001 \001(" +
+ "\0132\024.exec.shared.QueryId\022\021\n\trow_count\030\002 \001" +
+ "(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.RecordBatc" +
+ "hDef\"\227\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005sta" +
+ "rt\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.Qu",
+ "eryResult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n" +
+ "\007foreman\030\005 \001(\0132\026.exec.DrillbitEndpoint\"\272" +
+ "\003\n\014QueryProfile\022 \n\002id\030\001 \001(\0132\024.exec.share" +
+ "d.QueryId\022$\n\004type\030\002 \001(\0162\026.exec.shared.Qu" +
+ "eryType\022\r\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005q" +
+ "uery\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(" +
+ "\0132\026.exec.DrillbitEndpoint\0222\n\005state\030\010 \001(\016" +
+ "2#.exec.shared.QueryResult.QueryState\022\027\n" +
+ "\017total_fragments\030\t \001(\005\022\032\n\022finished_fragm" +
+ "ents\030\n \001(\005\022;\n\020fragment_profile\030\013 \003(\0132!.e",
+ "xec.shared.MajorFragmentProfile\022\017\n\004user\030" +
+ "\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024\n\014verboseError\030" +
+ "\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022\n\nerror_node\030\020 " +
+ "\001(\t\"t\n\024MajorFragmentProfile\022\031\n\021major_fra" +
+ "gment_id\030\001 \001(\005\022A\n\026minor_fragment_profile" +
+ "\030\002 \003(\0132!.exec.shared.MinorFragmentProfil" +
+ "e\"\350\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(" +
+ "\0162\032.exec.shared.FragmentState\022(\n\005error\030\002" +
+ " \001(\0132\031.exec.shared.DrillPBError\022\031\n\021minor" +
+ "_fragment_id\030\003 \001(\005\0226\n\020operator_profile\030\004",
+ " \003(\0132\034.exec.shared.OperatorProfile\022\022\n\nst" +
+ "art_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memo" +
+ "ry_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(" +
+ "\n\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint" +
+ "\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013" +
+ " \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profil" +
+ "e\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013o" +
+ "perator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022" +
+ "\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 " +
+ "\001(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003",
+ "\022(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricVal" +
+ "ue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022" +
+ "\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sch" +
+ "emas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001" +
+ " \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value" +
+ "\030\003 \001(\001*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n" +
+ "\010BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\tQueryType\022\007\n\003SQ" +
+ "L\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*\207\001\n\rFragm" +
+ "entState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCA" +
+ "TION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCAN",
+ "CELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQ" +
+ "UESTED\020\006*\335\005\n\020CoreOperatorType\022\021\n\rSINGLE_" +
+ "SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER" +
+ "\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n" +
+ "\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006" +
+ "\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORD" +
+ "ERED_PARTITION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022" +
+ "UNORDERED_RECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n" +
+ "\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_REMOVER\020\016" +
+ "\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020",
+ "\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION" +
+ "\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SC" +
+ "AN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_" +
+ "SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRI" +
+ "TER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITE" +
+ "R\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020" +
+ "\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_T" +
+ "O_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE" +
+ "_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_J" +
+ "OIN\020#\022\021\n\rAVRO_SUB_SCAN\020$B.\n\033org.apache.d",
+ "rill.exec.protoB\rUserBitSharedH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryInfo.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryInfo.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryInfo.java
index 4080e26..612b483 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryInfo.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryInfo.java
@@ -92,7 +92,7 @@ public final class QueryInfo implements Externalizable, Message<QueryInfo>, Sche
public QueryResult.QueryState getState()
{
- return state == null ? QueryResult.QueryState.PENDING : state;
+ return state == null ? QueryResult.QueryState.STARTING : state;
}
public QueryInfo setState(QueryResult.QueryState state)
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryProfile.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryProfile.java
index c610a84..d3fac19 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryProfile.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryProfile.java
@@ -170,7 +170,7 @@ public final class QueryProfile implements Externalizable, Message<QueryProfile>
public QueryResult.QueryState getState()
{
- return state == null ? QueryResult.QueryState.PENDING : state;
+ return state == null ? QueryResult.QueryState.STARTING : state;
}
public QueryProfile setState(QueryResult.QueryState state)
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
index 474e330..7b2a273 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java
@@ -37,12 +37,13 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
{
public enum QueryState implements com.dyuproject.protostuff.EnumLite<QueryState>
{
- PENDING(0),
+ STARTING(0),
RUNNING(1),
COMPLETED(2),
CANCELED(3),
FAILED(4),
- CANCELLATION_REQUESTED(5);
+ CANCELLATION_REQUESTED(5),
+ ENQUEUED(6);
public final int number;
@@ -60,12 +61,13 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
{
switch(number)
{
- case 0: return PENDING;
+ case 0: return STARTING;
case 1: return RUNNING;
case 2: return COMPLETED;
case 3: return CANCELED;
case 4: return FAILED;
case 5: return CANCELLATION_REQUESTED;
+ case 6: return ENQUEUED;
default: return null;
}
}
@@ -100,7 +102,7 @@ public final class QueryResult implements Externalizable, Message<QueryResult>,
public QueryState getQueryState()
{
- return queryState == null ? QueryState.PENDING : queryState;
+ return queryState == null ? QueryState.STARTING : queryState;
}
public QueryResult setQueryState(QueryState queryState)
http://git-wip-us.apache.org/repos/asf/drill/blob/de008810/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 0451fd2..c44d2b2 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -152,12 +152,13 @@ message NodeStatus {
*/
message QueryResult {
enum QueryState {
- PENDING = 0;
+ STARTING = 0; // query has been scheduled for execution. This is post-enqueued.
RUNNING = 1;
COMPLETED = 2; // query has completed successfully
CANCELED = 3; // query has been cancelled, and all cleanup is complete
FAILED = 4;
CANCELLATION_REQUESTED = 5; // cancellation has been requested, and is being processed
+ ENQUEUED = 6; // query has been enqueued. this is pre-starting.
}
optional QueryState query_state = 1;