You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/05/18 19:31:58 UTC
hive git commit: HIVE-14052. Cleanup structures when external clients
use LLAP. (Siddharth Seth, reviewed by Jason Dere, Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 3671668b6 -> 1dfe101a7
HIVE-14052. Cleanup structures when external clients use LLAP. (Siddharth Seth, reviewed by Jason Dere, Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1dfe101a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1dfe101a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1dfe101a
Branch: refs/heads/master
Commit: 1dfe101a74af59aaa47f08399be76a798682d740
Parents: 3671668
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 18 12:30:46 2017 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 18 12:30:46 2017 -0700
----------------------------------------------------------------------
.../daemon/rpc/LlapDaemonProtocolProtos.java | 250 +++++++++++++------
.../src/protobuf/LlapDaemonProtocol.proto | 1 +
.../hadoop/hive/llap/daemon/impl/QueryInfo.java | 9 +-
.../hive/llap/daemon/impl/QueryTracker.java | 198 ++++++++++++---
.../daemon/impl/TaskExecutorTestHelpers.java | 2 +-
.../ql/udf/generic/GenericUDTFGetSplits.java | 23 +-
6 files changed, 357 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index ece31ed..c19cf63 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: LlapDaemonProtocol.proto
+// source: llap-common/src/protobuf/LlapDaemonProtocol.proto
package org.apache.hadoop.hive.llap.daemon.rpc;
@@ -3454,6 +3454,16 @@ public final class LlapDaemonProtocolProtos {
* </pre>
*/
int getVertexParallelism();
+
+ // optional bool is_external_submission = 14 [default = false];
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ boolean hasIsExternalSubmission();
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ boolean getIsExternalSubmission();
}
/**
* Protobuf type {@code SignableVertexSpec}
@@ -3600,6 +3610,11 @@ public final class LlapDaemonProtocolProtos {
vertexParallelism_ = input.readInt32();
break;
}
+ case 112: {
+ bitField0_ |= 0x00000400;
+ isExternalSubmission_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4096,6 +4111,22 @@ public final class LlapDaemonProtocolProtos {
return vertexParallelism_;
}
+ // optional bool is_external_submission = 14 [default = false];
+ public static final int IS_EXTERNAL_SUBMISSION_FIELD_NUMBER = 14;
+ private boolean isExternalSubmission_;
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ public boolean hasIsExternalSubmission() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ public boolean getIsExternalSubmission() {
+ return isExternalSubmission_;
+ }
+
private void initFields() {
user_ = "";
signatureKeyId_ = 0L;
@@ -4110,6 +4141,7 @@ public final class LlapDaemonProtocolProtos {
outputSpecs_ = java.util.Collections.emptyList();
groupedInputSpecs_ = java.util.Collections.emptyList();
vertexParallelism_ = 0;
+ isExternalSubmission_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -4162,6 +4194,9 @@ public final class LlapDaemonProtocolProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeInt32(13, vertexParallelism_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeBool(14, isExternalSubmission_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4223,6 +4258,10 @@ public final class LlapDaemonProtocolProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(13, vertexParallelism_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(14, isExternalSubmission_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4302,6 +4341,11 @@ public final class LlapDaemonProtocolProtos {
result = result && (getVertexParallelism()
== other.getVertexParallelism());
}
+ result = result && (hasIsExternalSubmission() == other.hasIsExternalSubmission());
+ if (hasIsExternalSubmission()) {
+ result = result && (getIsExternalSubmission()
+ == other.getIsExternalSubmission());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -4367,6 +4411,10 @@ public final class LlapDaemonProtocolProtos {
hash = (37 * hash) + VERTEX_PARALLELISM_FIELD_NUMBER;
hash = (53 * hash) + getVertexParallelism();
}
+ if (hasIsExternalSubmission()) {
+ hash = (37 * hash) + IS_EXTERNAL_SUBMISSION_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getIsExternalSubmission());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -4531,6 +4579,8 @@ public final class LlapDaemonProtocolProtos {
}
vertexParallelism_ = 0;
bitField0_ = (bitField0_ & ~0x00001000);
+ isExternalSubmission_ = false;
+ bitField0_ = (bitField0_ & ~0x00002000);
return this;
}
@@ -4634,6 +4684,10 @@ public final class LlapDaemonProtocolProtos {
to_bitField0_ |= 0x00000200;
}
result.vertexParallelism_ = vertexParallelism_;
+ if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.isExternalSubmission_ = isExternalSubmission_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4768,6 +4822,9 @@ public final class LlapDaemonProtocolProtos {
if (other.hasVertexParallelism()) {
setVertexParallelism(other.getVertexParallelism());
}
+ if (other.hasIsExternalSubmission()) {
+ setIsExternalSubmission(other.getIsExternalSubmission());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6282,6 +6339,39 @@ public final class LlapDaemonProtocolProtos {
return this;
}
+ // optional bool is_external_submission = 14 [default = false];
+ private boolean isExternalSubmission_ ;
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ public boolean hasIsExternalSubmission() {
+ return ((bitField0_ & 0x00002000) == 0x00002000);
+ }
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ public boolean getIsExternalSubmission() {
+ return isExternalSubmission_;
+ }
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ public Builder setIsExternalSubmission(boolean value) {
+ bitField0_ |= 0x00002000;
+ isExternalSubmission_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool is_external_submission = 14 [default = false];</code>
+ */
+ public Builder clearIsExternalSubmission() {
+ bitField0_ = (bitField0_ & ~0x00002000);
+ isExternalSubmission_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:SignableVertexSpec)
}
@@ -17411,83 +17501,85 @@ public final class LlapDaemonProtocolProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" +
- "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" +
- "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" +
- "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" +
- "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" +
- "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" +
- "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" +
- "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" +
- "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" +
- "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d",
- "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" +
- "\245\003\n\022SignableVertexSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016" +
- "signatureKeyId\030\002 \001(\003\022/\n\020query_identifier" +
- "\030\003 \001(\0132\025.QueryIdentifierProto\022\025\n\rhive_qu" +
- "ery_id\030\004 \001(\t\022\020\n\010dag_name\030\005 \001(\t\022\023\n\013vertex" +
- "_name\030\006 \001(\t\022\024\n\014vertex_index\030\007 \001(\005\022\030\n\020tok" +
- "en_identifier\030\010 \001(\t\0224\n\024processor_descrip" +
- "tor\030\t \001(\0132\026.EntityDescriptorProto\022!\n\013inp" +
- "ut_specs\030\n \003(\0132\014.IOSpecProto\022\"\n\014output_s" +
- "pecs\030\013 \003(\0132\014.IOSpecProto\0221\n\023grouped_inpu",
- "t_specs\030\014 \003(\0132\024.GroupInputSpecProto\022\032\n\022v" +
- "ertex_parallelism\030\r \001(\005\"K\n\016VertexOrBinar" +
- "y\022#\n\006vertex\030\001 \001(\0132\023.SignableVertexSpec\022\024" +
- "\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023FragmentRuntime" +
- "Info\022#\n\033num_self_and_upstream_tasks\030\001 \001(" +
- "\005\022-\n%num_self_and_upstream_completed_tas" +
- "ks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001(\005\022\026\n" +
- "\016dag_start_time\030\004 \001(\003\022 \n\030first_attempt_s" +
- "tart_time\030\005 \001(\003\022\"\n\032current_attempt_start" +
- "_time\030\006 \001(\003\"d\n\024QueryIdentifierProto\022\035\n\025a",
- "pplication_id_string\030\001 \001(\t\022\021\n\tdag_index\030" +
- "\002 \001(\005\022\032\n\022app_attempt_number\030\003 \001(\005\"l\n\013Not" +
- "TezEvent\022\037\n\027input_event_proto_bytes\030\001 \002(" +
- "\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017dest_input_nam" +
- "e\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n\026SubmitWorkRe" +
- "questProto\022\"\n\twork_spec\030\001 \001(\0132\017.VertexOr" +
- "Binary\022\033\n\023work_spec_signature\030\002 \001(\014\022\027\n\017f" +
- "ragment_number\030\003 \001(\005\022\026\n\016attempt_number\030\004" +
- " \001(\005\022\033\n\023container_id_string\030\005 \001(\t\022\017\n\007am_" +
- "host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005\022\032\n\022credentia",
- "ls_binary\030\010 \001(\014\0223\n\025fragment_runtime_info" +
- "\030\t \001(\0132\024.FragmentRuntimeInfo\022\033\n\023initial_" +
- "event_bytes\030\n \001(\014\022\037\n\027initial_event_signa" +
- "ture\030\013 \001(\014\"b\n\027SubmitWorkResponseProto\022/\n" +
- "\020submission_state\030\001 \001(\0162\025.SubmissionStat" +
- "eProto\022\026\n\016unique_node_id\030\002 \001(\t\"\205\001\n\036Sourc" +
- "eStateUpdatedRequestProto\022/\n\020query_ident" +
- "ifier\030\001 \001(\0132\025.QueryIdentifierProto\022\020\n\010sr" +
- "c_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStat" +
- "eProto\"!\n\037SourceStateUpdatedResponseProt",
- "o\"e\n\031QueryCompleteRequestProto\022/\n\020query_" +
- "identifier\030\001 \001(\0132\025.QueryIdentifierProto\022" +
- "\027\n\014delete_delay\030\002 \001(\003:\0010\"\034\n\032QueryComplet" +
- "eResponseProto\"t\n\035TerminateFragmentReque" +
- "stProto\022/\n\020query_identifier\030\001 \001(\0132\025.Quer" +
- "yIdentifierProto\022\"\n\032fragment_identifier_" +
- "string\030\002 \001(\t\" \n\036TerminateFragmentRespons" +
- "eProto\"&\n\024GetTokenRequestProto\022\016\n\006app_id" +
- "\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token" +
- "\030\001 \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n",
- "\013fragment_id\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020Sou" +
- "rceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNN" +
- "ING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTE" +
- "D\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n" +
- "\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Subm" +
- "itWorkRequestProto\032\030.SubmitWorkResponseP" +
- "roto\022W\n\022sourceStateUpdated\022\037.SourceState" +
- "UpdatedRequestProto\032 .SourceStateUpdated" +
- "ResponseProto\022H\n\rqueryComplete\022\032.QueryCo" +
- "mpleteRequestProto\032\033.QueryCompleteRespon",
- "seProto\022T\n\021terminateFragment\022\036.Terminate" +
- "FragmentRequestProto\032\037.TerminateFragment" +
- "ResponseProto2]\n\026LlapManagementProtocol\022" +
- "C\n\022getDelegationToken\022\025.GetTokenRequestP" +
- "roto\032\026.GetTokenResponseProtoBH\n&org.apac" +
- "he.hadoop.hive.llap.daemon.rpcB\030LlapDaem" +
- "onProtocolProtos\210\001\001\240\001\001"
+ "\n1llap-common/src/protobuf/LlapDaemonPro" +
+ "tocol.proto\"9\n\020UserPayloadProto\022\024\n\014user_" +
+ "payload\030\001 \001(\014\022\017\n\007version\030\002 \001(\005\"j\n\025Entity" +
+ "DescriptorProto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014u" +
+ "ser_payload\030\002 \001(\0132\021.UserPayloadProto\022\024\n\014" +
+ "history_text\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025con" +
+ "nected_vertex_name\030\001 \001(\t\022-\n\rio_descripto" +
+ "r\030\002 \001(\0132\026.EntityDescriptorProto\022\033\n\023physi" +
+ "cal_edge_count\030\003 \001(\005\"z\n\023GroupInputSpecPr" +
+ "oto\022\022\n\ngroup_name\030\001 \001(\t\022\026\n\016group_vertice",
+ "s\030\002 \003(\t\0227\n\027merged_input_descriptor\030\003 \001(\013" +
+ "2\026.EntityDescriptorProto\"\314\003\n\022SignableVer" +
+ "texSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016signatureKeyId\030" +
+ "\002 \001(\003\022/\n\020query_identifier\030\003 \001(\0132\025.QueryI" +
+ "dentifierProto\022\025\n\rhive_query_id\030\004 \001(\t\022\020\n" +
+ "\010dag_name\030\005 \001(\t\022\023\n\013vertex_name\030\006 \001(\t\022\024\n\014" +
+ "vertex_index\030\007 \001(\005\022\030\n\020token_identifier\030\010" +
+ " \001(\t\0224\n\024processor_descriptor\030\t \001(\0132\026.Ent" +
+ "ityDescriptorProto\022!\n\013input_specs\030\n \003(\0132" +
+ "\014.IOSpecProto\022\"\n\014output_specs\030\013 \003(\0132\014.IO",
+ "SpecProto\0221\n\023grouped_input_specs\030\014 \003(\0132\024" +
+ ".GroupInputSpecProto\022\032\n\022vertex_paralleli" +
+ "sm\030\r \001(\005\022%\n\026is_external_submission\030\016 \001(\010" +
+ ":\005false\"K\n\016VertexOrBinary\022#\n\006vertex\030\001 \001(" +
+ "\0132\023.SignableVertexSpec\022\024\n\014vertexBinary\030\002" +
+ " \001(\014\"\344\001\n\023FragmentRuntimeInfo\022#\n\033num_self" +
+ "_and_upstream_tasks\030\001 \001(\005\022-\n%num_self_an" +
+ "d_upstream_completed_tasks\030\002 \001(\005\022\033\n\023with" +
+ "in_dag_priority\030\003 \001(\005\022\026\n\016dag_start_time\030" +
+ "\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003\022",
+ "\"\n\032current_attempt_start_time\030\006 \001(\003\"d\n\024Q" +
+ "ueryIdentifierProto\022\035\n\025application_id_st" +
+ "ring\030\001 \001(\t\022\021\n\tdag_index\030\002 \001(\005\022\032\n\022app_att" +
+ "empt_number\030\003 \001(\005\"l\n\013NotTezEvent\022\037\n\027inpu" +
+ "t_event_proto_bytes\030\001 \002(\014\022\023\n\013vertex_name" +
+ "\030\002 \002(\t\022\027\n\017dest_input_name\030\003 \002(\t\022\016\n\006key_i" +
+ "d\030\004 \001(\005\"\330\002\n\026SubmitWorkRequestProto\022\"\n\two" +
+ "rk_spec\030\001 \001(\0132\017.VertexOrBinary\022\033\n\023work_s" +
+ "pec_signature\030\002 \001(\014\022\027\n\017fragment_number\030\003" +
+ " \001(\005\022\026\n\016attempt_number\030\004 \001(\005\022\033\n\023containe",
+ "r_id_string\030\005 \001(\t\022\017\n\007am_host\030\006 \001(\t\022\017\n\007am" +
+ "_port\030\007 \001(\005\022\032\n\022credentials_binary\030\010 \001(\014\022" +
+ "3\n\025fragment_runtime_info\030\t \001(\0132\024.Fragmen" +
+ "tRuntimeInfo\022\033\n\023initial_event_bytes\030\n \001(" +
+ "\014\022\037\n\027initial_event_signature\030\013 \001(\014\"b\n\027Su" +
+ "bmitWorkResponseProto\022/\n\020submission_stat" +
+ "e\030\001 \001(\0162\025.SubmissionStateProto\022\026\n\016unique" +
+ "_node_id\030\002 \001(\t\"\205\001\n\036SourceStateUpdatedReq" +
+ "uestProto\022/\n\020query_identifier\030\001 \001(\0132\025.Qu" +
+ "eryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005",
+ "state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Source" +
+ "StateUpdatedResponseProto\"e\n\031QueryComple" +
+ "teRequestProto\022/\n\020query_identifier\030\001 \001(\013" +
+ "2\025.QueryIdentifierProto\022\027\n\014delete_delay\030" +
+ "\002 \001(\003:\0010\"\034\n\032QueryCompleteResponseProto\"t" +
+ "\n\035TerminateFragmentRequestProto\022/\n\020query" +
+ "_identifier\030\001 \001(\0132\025.QueryIdentifierProto" +
+ "\022\"\n\032fragment_identifier_string\030\002 \001(\t\" \n\036" +
+ "TerminateFragmentResponseProto\"&\n\024GetTok" +
+ "enRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025GetTok",
+ "enResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033LlapOu" +
+ "tputSocketInitMessage\022\023\n\013fragment_id\030\001 \002" +
+ "(\t\022\r\n\005token\030\002 \001(\014*2\n\020SourceStateProto\022\017\n" +
+ "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Submiss" +
+ "ionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020" +
+ "\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonProto" +
+ "col\022?\n\nsubmitWork\022\027.SubmitWorkRequestPro" +
+ "to\032\030.SubmitWorkResponseProto\022W\n\022sourceSt" +
+ "ateUpdated\022\037.SourceStateUpdatedRequestPr" +
+ "oto\032 .SourceStateUpdatedResponseProto\022H\n",
+ "\rqueryComplete\022\032.QueryCompleteRequestPro" +
+ "to\032\033.QueryCompleteResponseProto\022T\n\021termi" +
+ "nateFragment\022\036.TerminateFragmentRequestP" +
+ "roto\032\037.TerminateFragmentResponseProto2]\n" +
+ "\026LlapManagementProtocol\022C\n\022getDelegation" +
+ "Token\022\025.GetTokenRequestProto\032\026.GetTokenR" +
+ "esponseProtoBH\n&org.apache.hadoop.hive.l" +
+ "lap.daemon.rpcB\030LlapDaemonProtocolProtos" +
+ "\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17523,7 +17615,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_SignableVertexSpec_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SignableVertexSpec_descriptor,
- new java.lang.String[] { "User", "SignatureKeyId", "QueryIdentifier", "HiveQueryId", "DagName", "VertexName", "VertexIndex", "TokenIdentifier", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", });
+ new java.lang.String[] { "User", "SignatureKeyId", "QueryIdentifier", "HiveQueryId", "DagName", "VertexName", "VertexIndex", "TokenIdentifier", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "IsExternalSubmission", });
internal_static_VertexOrBinary_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_VertexOrBinary_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 3a3a2b8..e0c0070 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -67,6 +67,7 @@ message SignableVertexSpec
repeated GroupInputSpecProto grouped_input_specs = 12;
optional int32 vertex_parallelism = 13; // An internal field required for Tez.
+ optional bool is_external_submission = 14 [default = false];
}
// Union
http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index ce2f457..a6d9d54 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -60,6 +60,7 @@ public class QueryInfo {
private final LlapNodeId amNodeId;
private final String appTokenIdentifier;
private final Token<JobTokenIdentifier> appToken;
+ private final boolean isExternalQuery;
// Map of states for different vertices.
private final Set<QueryFragmentInfo> knownFragments =
@@ -77,7 +78,8 @@ public class QueryInfo {
String[] localDirsBase, FileSystem localFs, String tokenUserName,
String tokenAppId, final LlapNodeId amNodeId,
String tokenIdentifier,
- Token<JobTokenIdentifier> appToken) {
+ Token<JobTokenIdentifier> appToken,
+ boolean isExternalQuery) {
this.queryIdentifier = queryIdentifier;
this.appIdString = appIdString;
this.dagIdString = dagIdString;
@@ -93,6 +95,7 @@ public class QueryInfo {
this.amNodeId = amNodeId;
this.appTokenIdentifier = tokenIdentifier;
this.appToken = appToken;
+ this.isExternalQuery = isExternalQuery;
final InetSocketAddress address =
NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort());
SecurityUtil.setTokenService(appToken, address);
@@ -146,6 +149,10 @@ public class QueryInfo {
return Lists.newArrayList(knownFragments);
}
+ public boolean isExternalQuery() {
+ return isExternalQuery;
+ }
+
private synchronized void createLocalDirs() throws IOException {
if (localDirs == null) {
localDirs = new String[localDirsBase.length];
http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index daeb555..5c42b27 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -89,7 +89,7 @@ public class QueryTracker extends AbstractService {
private final Lock lock = new ReentrantLock();
- private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+ private final ConcurrentMap<QueryIdentifier, ReentrantReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
// Tracks various maps for dagCompletions. This is setup here since stateChange messages
// may be processed by a thread which ends up executing before a task.
@@ -119,7 +119,7 @@ public class QueryTracker extends AbstractService {
int numCleanerThreads = HiveConf.getIntVar(
conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
this.executorService = Executors.newScheduledThreadPool(numCleanerThreads,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build());
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryCompletionThread %d").build());
String logger = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_LOGGER);
if (logger != null && (logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING))) {
@@ -169,7 +169,8 @@ public class QueryTracker extends AbstractService {
new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
dagIdentifier, user,
getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
- tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken);
+ tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken,
+ vertex.getIsExternalSubmission());
QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
if (old != null) {
queryInfo = old;
@@ -188,9 +189,11 @@ public class QueryTracker extends AbstractService {
if (LOG.isDebugEnabled()) {
LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
}
- ShuffleHandler.get()
- .registerDag(appIdString, dagIdentifier, appToken,
- user, queryInfo.getLocalDirs());
+ if (!vertex.getIsExternalSubmission()) {
+ ShuffleHandler.get()
+ .registerDag(appIdString, dagIdentifier, appToken,
+ user, queryInfo.getLocalDirs());
+ }
return queryInfo.registerFragment(
vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString);
@@ -212,6 +215,9 @@ public class QueryTracker extends AbstractService {
LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId);
} else {
queryInfo.unregisterFragment(fragmentInfo);
+
+ // Try marking the query as complete if this is an external submission
+ handleFragmentCompleteExternalQuery(queryInfo);
}
}
@@ -237,46 +243,45 @@ public class QueryTracker extends AbstractService {
* @param deleteDelay
*/
QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
- boolean isInternal) throws IOException {
+ boolean isExternalQuery) throws IOException {
if (deleteDelay == -1) {
deleteDelay = defaultDeleteDelaySeconds;
}
ReadWriteLock dagLock = getDagLock(queryIdentifier);
dagLock.writeLock().lock();
try {
- QueryInfo queryInfo = isInternal
+ // If isExternalQuery -> the call is from within hte daemon, so no permission check required
+ // to get access to the queryInfo instance.
+ QueryInfo queryInfo = isExternalQuery
? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier);
- rememberCompletedDag(queryIdentifier);
- LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier,
- deleteDelay);
- queryInfoMap.remove(queryIdentifier);
if (queryInfo == null) {
// Should not happen.
LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
return null;
}
- String[] localDirs = queryInfo.getLocalDirsNoCreate();
- if (localDirs != null) {
- for (String localDir : localDirs) {
- cleanupDir(localDir, deleteDelay);
- ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
- }
- }
- if (routeBasedLoggingEnabled) {
- // Inform the routing purgePolicy.
- // Send out a fake log message at the ERROR level with the MDC for this query setup. With an
- // LLAP custom appender this message will not be logged.
- final String dagId = queryInfo.getDagIdString();
- final String queryId = queryInfo.getHiveQueryIdString();
- MDC.put("dagId", dagId);
- MDC.put("queryId", queryId);
- try {
- LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." +
- " Query complete: " + queryInfo.getHiveQueryIdString() + ", " +
- queryInfo.getDagIdString());
- } finally {
- MDC.clear();
+ LOG.info(
+ "Processing queryComplete for queryIdentifier={}, isExternalQuery={}, with deleteDelay={} seconds",
+ queryIdentifier, isExternalQuery,
+ deleteDelay);
+
+ queryInfoMap.remove(queryIdentifier);
+ if (!isExternalQuery) {
+ rememberCompletedDag(queryIdentifier);
+ cleanupLocalDirs(queryInfo, deleteDelay);
+ handleLogOnQueryCompletion(queryInfo.getHiveQueryIdString(), queryInfo.getDagIdString());
+ } else {
+ // If there's no pending fragments, queue some of the cleanup for a later point - locks, log rolling.
+ if (queryInfo.getRegisteredFragments().size() == 0) {
+ LOG.debug("Queueing future cleanup for external queryId: {}", queryInfo.getHiveQueryIdString());
+ executorService.schedule(new ExternalQueryCleanerCallable(queryInfo.getHiveQueryIdString(),
+ queryInfo.getDagIdString(), queryInfo.getQueryIdentifier()), 1, TimeUnit.MINUTES);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "NumRegisterFragments={}, Not queuing cleanup for external queryId={}",
+ queryInfo.getRegisteredFragments().size(), queryInfo.getHiveQueryIdString());
+ }
}
}
@@ -286,7 +291,9 @@ public class QueryTracker extends AbstractService {
// should not be allowed after a query complete is received.
sourceCompletionMap.remove(queryIdentifier);
String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier);
- dagSpecificLocks.remove(queryIdentifier);
+ if (!isExternalQuery) {
+ removeQuerySpecificLock(queryIdentifier);
+ }
if (savedQueryId != null) {
ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
}
@@ -297,6 +304,37 @@ public class QueryTracker extends AbstractService {
}
+ private void cleanupLocalDirs(QueryInfo queryInfo, long deleteDelay) {
+ String[] localDirs = queryInfo.getLocalDirsNoCreate();
+ if (localDirs != null) {
+ for (String localDir : localDirs) {
+ cleanupDir(localDir, deleteDelay);
+ ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
+ }
+ }
+ }
+
+ private void handleLogOnQueryCompletion(String queryIdString, String dagIdString) {
+ if (routeBasedLoggingEnabled) {
+ // Inform the routing purgePolicy.
+ // Send out a fake log message at the ERROR level with the MDC for this query setup. With an
+ // LLAP custom appender this message will not be logged.
+ MDC.put("dagId", dagIdString);
+ MDC.put("queryId", queryIdString);
+ try {
+ LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." +
+ " Query complete: " + queryIdString + ", " +
+ dagIdString);
+ } finally {
+ MDC.clear();
+ }
+ }
+ }
+
+ private void removeQuerySpecificLock(QueryIdentifier queryIdentifier) {
+ dagSpecificLocks.remove(queryIdentifier);
+ }
+
public void rememberCompletedDag(QueryIdentifier queryIdentifier) {
if (completedDagMap.add(queryIdentifier)) {
@@ -325,11 +363,14 @@ public class QueryTracker extends AbstractService {
}
}
+ private ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
+ return dagSpecificLocks.get(queryIdentifier);
+ }
- private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
+ private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
lock.lock();
try {
- ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
+ ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
if (dagLock == null) {
dagLock = new ReentrantReadWriteLock();
dagSpecificLocks.put(queryIdentifier, dagLock);
@@ -403,6 +444,58 @@ public class QueryTracker extends AbstractService {
}
}
+ private class ExternalQueryCleanerCallable extends CallableWithNdc<Void> {
+
+ private final String queryIdString;
+ private final String dagIdString;
+ private final QueryIdentifier queryIdentifier;
+
+ public ExternalQueryCleanerCallable(String queryIdString, String dagIdString,
+ QueryIdentifier queryIdentifier) {
+ this.queryIdString = queryIdString;
+ this.dagIdString = dagIdString;
+ this.queryIdentifier = queryIdentifier;
+ }
+
+ @Override
+ protected Void callInternal() {
+ LOG.info("External cleanup callable for {}", queryIdentifier);
+ ReentrantReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier);
+ if (dagLock == null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("null dagLock. No cleanup required at the moment for {}", queryIdString);
+ }
+ return null;
+ }
+ boolean locked = dagLock.writeLock().tryLock();
+ if (!locked) {
+ // Something else holds the lock at the moment. Don't bother cleaning up.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Lock not obtained. Skipping cleanup for {}", queryIdString);
+ }
+ return null;
+ }
+ try {
+ // See if there are additional knownFragments. If there are, more fragments came in
+ // after this cleanup was scheduled, and there's nothing to be done.
+ QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+ if (queryInfo != null) {
+ // QueryInfo will only exist if more work came in, after this was scheduled.
+ if (LOG.isTraceEnabled()) {
+ LOG.info("QueryInfo found for {}. Expecting future cleanup", queryIdString);
+ }
+ return null;
+ }
+ LOG.info("Processing cleanup for {}", queryIdString);
+ handleLogOnQueryCompletion(queryIdString, dagIdString);
+ removeQuerySpecificLock(queryIdentifier);
+ } finally {
+ dagLock.writeLock().unlock();
+ }
+ return null;
+ }
+ }
+
private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException {
QueryInfo queryInfo = queryInfoMap.get(queryId);
if (queryInfo == null) return null;
@@ -414,4 +507,37 @@ public class QueryTracker extends AbstractService {
public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException {
return checkPermissionsAndGetQuery(queryId) != null;
}
+
+
+ private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) {
+ if (queryInfo.isExternalQuery()) {
+ ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
+ if (dagLock == null) {
+ LOG.warn("Ignoring fragment completion for unknown query: {}",
+ queryInfo.getQueryIdentifier());
+ }
+ boolean locked = dagLock.writeLock().tryLock();
+ if (!locked) {
+ // Some other operation in progress using the same lock.
+ // A subsequent fragmentComplete is expected to come in.
+ return;
+ }
+ try {
+ if (queryInfo.getRegisteredFragments().size() == 0) {
+ queryComplete(queryInfo.getQueryIdentifier(), -1, true);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Not invoking queryComplete on fragmentComplete for {}, since there are known fragments. count={}",
+ queryInfo.getHiveQueryIdString(), queryInfo.getRegisteredFragments().size());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to process query complete for external submission: {}",
+ queryInfo.getQueryIdentifier());
+ } finally {
+ dagLock.writeLock().unlock();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 27c426c..e3edf79 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -96,7 +96,7 @@ public class TaskExecutorTestHelpers {
new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
"fakeHiveQueryId", 1, "fakeUser",
new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
- new String[0], null, "fakeUser", null, nodeId, null, null);
+ new String[0], null, "fakeUser", null, nodeId, null, null, false);
return queryInfo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 868eec7..d4ec44e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -299,6 +299,17 @@ public class GenericUDTFGetSplits extends GenericUDTF {
FileSystem fs = scratchDir.getFileSystem(job);
try {
LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+
+ LlapCoordinator coordinator = LlapCoordinator.getInstance();
+ if (coordinator == null) {
+ throw new IOException("LLAP coordinator is not initialized; must be running in HS2 with "
+ + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
+ }
+
+ // Update the queryId to use the generated applicationId. See comment below about
+ // why this is done.
+ ApplicationId applicationId = coordinator.createExtClientAppId();
+ HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString());
Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
new ArrayList<LocalResource>(), fs, ctx, false, work,
work.getVertexType(mapWork));
@@ -312,6 +323,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
Preconditions.checkState(HiveConf.getBoolVar(wxConf,
ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+
+
HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork);
List<Event> eventList = splitGenerator.initialize();
@@ -328,15 +341,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
LOG.debug("NumEvents=" + eventList.size() + ", NumSplits=" + result.length);
}
- LlapCoordinator coordinator = LlapCoordinator.getInstance();
- if (coordinator == null) {
- throw new IOException("LLAP coordinator is not initialized; must be running in HS2 with "
- + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
- }
-
- // See the discussion in the implementation as to why we generate app ID.
- ApplicationId applicationId = coordinator.createExtClientAppId();
-
// This assumes LLAP cluster owner is always the HS2 user.
String llapUser = UserGroupInformation.getLoginUser().getShortUserName();
@@ -440,6 +444,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
.setDagIndex(taskSpec.getDagIdentifier()).setAppAttemptNumber(0).build();
final SignableVertexSpec.Builder svsb = Converters.constructSignableVertexSpec(
taskSpec, queryIdentifierProto, applicationId.toString(), queryUser, queryIdString);
+ svsb.setIsExternalSubmission(true);
if (signer == null) {
SignedMessage result = new SignedMessage();
result.message = serializeVertexSpec(svsb);