You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/03 03:25:39 UTC
hive git commit: reverting HIVE-13674: usingTezAm field not required
in LLAP SubmitWorkRequestProto
Repository: hive
Updated Branches:
refs/heads/llap 33c86c45c -> 342668f91
reverting HIVE-13674: usingTezAm field not required in LLAP SubmitWorkRequestProto
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/342668f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/342668f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/342668f9
Branch: refs/heads/llap
Commit: 342668f914587aa396a7f74c421c4dcf2037d433
Parents: 33c86c4
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon May 2 18:23:30 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon May 2 18:23:30 2016 -0700
----------------------------------------------------------------------
.../ext/LlapTaskUmbilicalExternalClient.java | 4 +-
.../daemon/rpc/LlapDaemonProtocolProtos.java | 230 +++++++++++++------
.../src/protobuf/LlapDaemonProtocol.proto | 8 +
.../hadoop/hive/llap/LlapBaseInputFormat.java | 1 +
.../llap/daemon/impl/TaskRunnerCallable.java | 3 +
5 files changed, 175 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index fe2fd7c..8598bc8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -123,10 +123,12 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
/**
- * Submit the work for actual execution.
+ * Submit the work for actual execution. This should always have the usingTezAm flag disabled
* @param submitWorkRequestProto
*/
public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
+ Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
+
// Register the pending events to be sent for this spec.
String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
PendingEventData pendingEventData = new PendingEventData(
http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 6a20031..653e7e0 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: llap-common/src/protobuf/LlapDaemonProtocol.proto
+// source: LlapDaemonProtocol.proto
package org.apache.hadoop.hive.llap.daemon.rpc;
@@ -7334,6 +7334,16 @@ public final class LlapDaemonProtocolProtos {
* <code>optional .FragmentRuntimeInfo fragment_runtime_info = 10;</code>
*/
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfoOrBuilder getFragmentRuntimeInfoOrBuilder();
+
+ // optional bool usingTezAm = 11 [default = true];
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ boolean hasUsingTezAm();
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ boolean getUsingTezAm();
}
/**
* Protobuf type {@code SubmitWorkRequestProto}
@@ -7452,6 +7462,11 @@ public final class LlapDaemonProtocolProtos {
bitField0_ |= 0x00000200;
break;
}
+ case 88: {
+ bitField0_ |= 0x00000400;
+ usingTezAm_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -7799,6 +7814,22 @@ public final class LlapDaemonProtocolProtos {
return fragmentRuntimeInfo_;
}
+ // optional bool usingTezAm = 11 [default = true];
+ public static final int USINGTEZAM_FIELD_NUMBER = 11;
+ private boolean usingTezAm_;
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean hasUsingTezAm() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean getUsingTezAm() {
+ return usingTezAm_;
+ }
+
private void initFields() {
containerIdString_ = "";
amHost_ = "";
@@ -7810,6 +7841,7 @@ public final class LlapDaemonProtocolProtos {
appAttemptNumber_ = 0;
fragmentSpec_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance();
fragmentRuntimeInfo_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo.getDefaultInstance();
+ usingTezAm_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7853,6 +7885,9 @@ public final class LlapDaemonProtocolProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeMessage(10, fragmentRuntimeInfo_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeBool(11, usingTezAm_);
+ }
getUnknownFields().writeTo(output);
}
@@ -7902,6 +7937,10 @@ public final class LlapDaemonProtocolProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, fragmentRuntimeInfo_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(11, usingTezAm_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -7975,6 +8014,11 @@ public final class LlapDaemonProtocolProtos {
result = result && getFragmentRuntimeInfo()
.equals(other.getFragmentRuntimeInfo());
}
+ result = result && (hasUsingTezAm() == other.hasUsingTezAm());
+ if (hasUsingTezAm()) {
+ result = result && (getUsingTezAm()
+ == other.getUsingTezAm());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -8028,6 +8072,10 @@ public final class LlapDaemonProtocolProtos {
hash = (37 * hash) + FRAGMENT_RUNTIME_INFO_FIELD_NUMBER;
hash = (53 * hash) + getFragmentRuntimeInfo().hashCode();
}
+ if (hasUsingTezAm()) {
+ hash = (37 * hash) + USINGTEZAM_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getUsingTezAm());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -8167,6 +8215,8 @@ public final class LlapDaemonProtocolProtos {
fragmentRuntimeInfoBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
+ usingTezAm_ = true;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -8243,6 +8293,10 @@ public final class LlapDaemonProtocolProtos {
} else {
result.fragmentRuntimeInfo_ = fragmentRuntimeInfoBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.usingTezAm_ = usingTezAm_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -8299,6 +8353,9 @@ public final class LlapDaemonProtocolProtos {
if (other.hasFragmentRuntimeInfo()) {
mergeFragmentRuntimeInfo(other.getFragmentRuntimeInfo());
}
+ if (other.hasUsingTezAm()) {
+ setUsingTezAm(other.getUsingTezAm());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -9032,6 +9089,39 @@ public final class LlapDaemonProtocolProtos {
return fragmentRuntimeInfoBuilder_;
}
+ // optional bool usingTezAm = 11 [default = true];
+ private boolean usingTezAm_ = true;
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean hasUsingTezAm() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean getUsingTezAm() {
+ return usingTezAm_;
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public Builder setUsingTezAm(boolean value) {
+ bitField0_ |= 0x00000400;
+ usingTezAm_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public Builder clearUsingTezAm() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ usingTezAm_ = true;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:SubmitWorkRequestProto)
}
@@ -14365,74 +14455,74 @@ public final class LlapDaemonProtocolProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n1llap-common/src/protobuf/LlapDaemonPro" +
- "tocol.proto\"9\n\020UserPayloadProto\022\024\n\014user_" +
- "payload\030\001 \001(\014\022\017\n\007version\030\002 \001(\005\"j\n\025Entity" +
- "DescriptorProto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014u" +
- "ser_payload\030\002 \001(\0132\021.UserPayloadProto\022\024\n\014" +
- "history_text\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025con" +
- "nected_vertex_name\030\001 \001(\t\022-\n\rio_descripto" +
- "r\030\002 \001(\0132\026.EntityDescriptorProto\022\033\n\023physi" +
- "cal_edge_count\030\003 \001(\005\"z\n\023GroupInputSpecPr" +
- "oto\022\022\n\ngroup_name\030\001 \001(\t\022\026\n\016group_vertice",
- "s\030\002 \003(\t\0227\n\027merged_input_descriptor\030\003 \001(\013" +
- "2\026.EntityDescriptorProto\"\353\002\n\021FragmentSpe" +
- "cProto\022\"\n\032fragment_identifier_string\030\001 \001" +
- "(\t\022\020\n\010dag_name\030\002 \001(\t\022\016\n\006dag_id\030\013 \001(\005\022\023\n\013" +
- "vertex_name\030\003 \001(\t\0224\n\024processor_descripto" +
- "r\030\004 \001(\0132\026.EntityDescriptorProto\022!\n\013input" +
- "_specs\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_spe" +
- "cs\030\006 \003(\0132\014.IOSpecProto\0221\n\023grouped_input_" +
- "specs\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022ver" +
- "tex_parallelism\030\010 \001(\005\022\027\n\017fragment_number",
- "\030\t \001(\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023Fragm" +
- "entRuntimeInfo\022#\n\033num_self_and_upstream_" +
- "tasks\030\001 \001(\005\022-\n%num_self_and_upstream_com" +
- "pleted_tasks\030\002 \001(\005\022\033\n\023within_dag_priorit" +
- "y\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first" +
- "_attempt_start_time\030\005 \001(\003\022\"\n\032current_att" +
- "empt_start_time\030\006 \001(\003\"F\n\024QueryIdentifier" +
- "Proto\022\026\n\016app_identifier\030\001 \001(\t\022\026\n\016dag_ide" +
- "ntifier\030\002 \001(\005\"\266\002\n\026SubmitWorkRequestProto" +
- "\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_host",
- "\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_identif" +
- "ier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014\022\014\n" +
- "\004user\030\006 \001(\t\022\035\n\025application_id_string\030\007 \001" +
- "(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfragme" +
- "nt_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025fra" +
- "gment_runtime_info\030\n \001(\0132\024.FragmentRunti" +
- "meInfo\"J\n\027SubmitWorkResponseProto\022/\n\020sub" +
- "mission_state\030\001 \001(\0162\025.SubmissionStatePro" +
- "to\"\205\001\n\036SourceStateUpdatedRequestProto\022/\n" +
- "\020query_identifier\030\001 \001(\0132\025.QueryIdentifie",
- "rProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162" +
- "\021.SourceStateProto\"!\n\037SourceStateUpdated" +
- "ResponseProto\"w\n\031QueryCompleteRequestPro" +
- "to\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifier" +
- "\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete_" +
- "delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponseP" +
- "roto\"t\n\035TerminateFragmentRequestProto\022/\n" +
- "\020query_identifier\030\001 \001(\0132\025.QueryIdentifie" +
- "rProto\022\"\n\032fragment_identifier_string\030\002 \001" +
- "(\t\" \n\036TerminateFragmentResponseProto\"\026\n\024",
- "GetTokenRequestProto\"&\n\025GetTokenResponse" +
- "Proto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProto" +
- "\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Subm" +
- "issionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECT" +
- "ED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonPr" +
- "otocol\022?\n\nsubmitWork\022\027.SubmitWorkRequest" +
- "Proto\032\030.SubmitWorkResponseProto\022W\n\022sourc" +
- "eStateUpdated\022\037.SourceStateUpdatedReques" +
- "tProto\032 .SourceStateUpdatedResponseProto" +
- "\022H\n\rqueryComplete\022\032.QueryCompleteRequest",
- "Proto\032\033.QueryCompleteResponseProto\022T\n\021te" +
- "rminateFragment\022\036.TerminateFragmentReque" +
- "stProto\032\037.TerminateFragmentResponseProto" +
- "2]\n\026LlapManagementProtocol\022C\n\022getDelegat" +
- "ionToken\022\025.GetTokenRequestProto\032\026.GetTok" +
- "enResponseProtoBH\n&org.apache.hadoop.hiv" +
- "e.llap.daemon.rpcB\030LlapDaemonProtocolPro" +
- "tos\210\001\001\240\001\001"
+ "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" +
+ "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" +
+ "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" +
+ "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" +
+ "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" +
+ "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" +
+ "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" +
+ "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" +
+ "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" +
+ "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d",
+ "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" +
+ "\353\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" +
+ "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\016\n\006d" +
+ "ag_id\030\013 \001(\005\022\023\n\013vertex_name\030\003 \001(\t\0224\n\024proc" +
+ "essor_descriptor\030\004 \001(\0132\026.EntityDescripto" +
+ "rProto\022!\n\013input_specs\030\005 \003(\0132\014.IOSpecProt" +
+ "o\022\"\n\014output_specs\030\006 \003(\0132\014.IOSpecProto\0221\n" +
+ "\023grouped_input_specs\030\007 \003(\0132\024.GroupInputS" +
+ "pecProto\022\032\n\022vertex_parallelism\030\010 \001(\005\022\027\n\017" +
+ "fragment_number\030\t \001(\005\022\026\n\016attempt_number\030",
+ "\n \001(\005\"\344\001\n\023FragmentRuntimeInfo\022#\n\033num_sel" +
+ "f_and_upstream_tasks\030\001 \001(\005\022-\n%num_self_a" +
+ "nd_upstream_completed_tasks\030\002 \001(\005\022\033\n\023wit" +
+ "hin_dag_priority\030\003 \001(\005\022\026\n\016dag_start_time" +
+ "\030\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003" +
+ "\022\"\n\032current_attempt_start_time\030\006 \001(\003\"F\n\024" +
+ "QueryIdentifierProto\022\026\n\016app_identifier\030\001" +
+ " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\320\002\n\026SubmitW" +
+ "orkRequestProto\022\033\n\023container_id_string\030\001" +
+ " \001(\t\022\017\n\007am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030",
+ "\n\020token_identifier\030\004 \001(\t\022\032\n\022credentials_" +
+ "binary\030\005 \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025applicatio" +
+ "n_id_string\030\007 \001(\t\022\032\n\022app_attempt_number\030" +
+ "\010 \001(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentS" +
+ "pecProto\0223\n\025fragment_runtime_info\030\n \001(\0132" +
+ "\024.FragmentRuntimeInfo\022\030\n\nusingTezAm\030\013 \001(" +
+ "\010:\004true\"J\n\027SubmitWorkResponseProto\022/\n\020su" +
+ "bmission_state\030\001 \001(\0162\025.SubmissionStatePr" +
+ "oto\"\205\001\n\036SourceStateUpdatedRequestProto\022/" +
+ "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi",
+ "erProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\016" +
+ "2\021.SourceStateProto\"!\n\037SourceStateUpdate" +
+ "dResponseProto\"w\n\031QueryCompleteRequestPr" +
+ "oto\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifie" +
+ "r\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete" +
+ "_delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponse" +
+ "Proto\"t\n\035TerminateFragmentRequestProto\022/" +
+ "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi" +
+ "erProto\022\"\n\032fragment_identifier_string\030\002 " +
+ "\001(\t\" \n\036TerminateFragmentResponseProto\"\026\n",
+ "\024GetTokenRequestProto\"&\n\025GetTokenRespons" +
+ "eProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProt" +
+ "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Sub" +
+ "missionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJEC" +
+ "TED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonP" +
+ "rotocol\022?\n\nsubmitWork\022\027.SubmitWorkReques" +
+ "tProto\032\030.SubmitWorkResponseProto\022W\n\022sour" +
+ "ceStateUpdated\022\037.SourceStateUpdatedReque" +
+ "stProto\032 .SourceStateUpdatedResponseProt" +
+ "o\022H\n\rqueryComplete\022\032.QueryCompleteReques",
+ "tProto\032\033.QueryCompleteResponseProto\022T\n\021t" +
+ "erminateFragment\022\036.TerminateFragmentRequ" +
+ "estProto\032\037.TerminateFragmentResponseProt" +
+ "o2]\n\026LlapManagementProtocol\022C\n\022getDelega" +
+ "tionToken\022\025.GetTokenRequestProto\032\026.GetTo" +
+ "kenResponseProtoBH\n&org.apache.hadoop.hi" +
+ "ve.llap.daemon.rpcB\030LlapDaemonProtocolPr" +
+ "otos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14486,7 +14576,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_SubmitWorkRequestProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SubmitWorkRequestProto_descriptor,
- new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", });
+ new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", "UsingTezAm", });
internal_static_SubmitWorkResponseProto_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_SubmitWorkResponseProto_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 944c96c..e964c5f 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -91,6 +91,7 @@ message SubmitWorkRequestProto {
optional int32 app_attempt_number = 8;
optional FragmentSpecProto fragment_spec = 9;
optional FragmentRuntimeInfo fragment_runtime_info = 10;
+ optional bool usingTezAm = 11 [default = true];
}
enum SubmissionStateProto {
@@ -136,11 +137,18 @@ message GetTokenResponseProto {
optional bytes token = 1;
}
+message SendEventsRequestProto {
+}
+
+message SendEventsResponseProto {
+}
+
service LlapDaemonProtocol {
rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto);
rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto);
+ rpc sendEvents(SendEventsRequestProto) return (SendEventsResponseProto);
}
service LlapManagementProtocol {
http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 8db2f88..10d14c0 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -346,6 +346,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
+ builder.setUsingTezAm(false);
builder.setFragmentRuntimeInfo(runtimeInfo.build());
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 4a33373..efd6f0a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -108,6 +108,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final String queryId;
private final HadoopShim tezHadoopShim;
private boolean shouldRunTask = true;
+ private final boolean withTezAm;
final Stopwatch runtimeWatch = new Stopwatch();
final Stopwatch killtimerWatch = new Stopwatch();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -136,6 +137,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.jobToken = TokenCache.getSessionToken(credentials);
this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
this.amReporter = amReporter;
+ this.withTezAm = request.getUsingTezAm();
+ LOG.warn("ZZZ: DBG: usingTezAm=" + withTezAm);
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());