You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/07 19:42:06 UTC
hive git commit: HIVE-15296 : AM may lose task failures and not
reschedule when scheduling to LLAP (Sergey Shelukhin, reviewed by Gopal V)
Repository: hive
Updated Branches:
refs/heads/master 70cc5efac -> 8804a7b89
HIVE-15296 : AM may lose task failures and not reschedule when scheduling to LLAP (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8804a7b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8804a7b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8804a7b8
Branch: refs/heads/master
Commit: 8804a7b89bf3d1d6e78d850cd524840e0bd71051
Parents: 70cc5ef
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 7 11:14:57 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 7 11:14:57 2016 -0800
----------------------------------------------------------------------
.../ext/LlapTaskUmbilicalExternalClient.java | 34 +-
.../daemon/rpc/LlapDaemonProtocolProtos.java | 323 ++++++++++++++-----
.../org/apache/hadoop/hive/llap/DaemonId.java | 17 +-
.../protocol/LlapTaskUmbilicalProtocol.java | 2 +-
.../src/protobuf/LlapDaemonProtocol.proto | 1 +
.../hive/llap/daemon/impl/AMReporter.java | 12 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 7 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 2 +-
.../llap/tezplugins/LlapTaskCommunicator.java | 53 ++-
9 files changed, 336 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 4933fb3..3e0232d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -81,6 +81,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
private static class TaskHeartbeatInfo {
final String taskAttemptId;
final String hostname;
+ String uniqueNodeId;
final int port;
final AtomicLong lastHeartbeat = new AtomicLong();
@@ -161,8 +162,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
final String fragmentId = attemptId.toString();
- pendingEvents.putIfAbsent(fragmentId, new PendingEventData(
- new TaskHeartbeatInfo(fragmentId, llapHost, llapPort), Lists.<TezEvent>newArrayList()));
+ final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(fragmentId, llapHost, llapPort);
+ pendingEvents.putIfAbsent(
+ fragmentId, new PendingEventData(thi, Lists.<TezEvent>newArrayList()));
// Setup timer task to check for hearbeat timeouts
timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
@@ -185,6 +187,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
return;
}
}
+ if (response.hasUniqueNodeId()) {
+ thi.uniqueNodeId = response.getUniqueNodeId();
+ }
}
@Override
@@ -217,26 +222,29 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
}
- private void updateHeartbeatInfo(String hostname, int port) {
+ private void updateHeartbeatInfo(String hostname, String uniqueId, int port) {
int updateCount = 0;
for (String key : pendingEvents.keySet()) {
PendingEventData pendingEventData = pendingEvents.get(key);
if (pendingEventData != null) {
- if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
- && pendingEventData.heartbeatInfo.port == port) {
- pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo;
+ String thiUniqueId = thi.uniqueNodeId;
+ if (thi.hostname.equals(hostname) && thi.port == port
+ && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
+ thi.lastHeartbeat.set(System.currentTimeMillis());
updateCount++;
}
}
}
for (String key : registeredTasks.keySet()) {
- TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
- if (heartbeatInfo != null) {
- if (heartbeatInfo.hostname.equals(hostname)
- && heartbeatInfo.port == port) {
- heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+ TaskHeartbeatInfo thi = registeredTasks.get(key);
+ if (thi != null) {
+ String thiUniqueId = thi.uniqueNodeId;
+ if (thi.hostname.equals(hostname) && thi.port == port
+ && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
+ thi.lastHeartbeat.set(System.currentTimeMillis());
updateCount++;
}
}
@@ -387,8 +395,8 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
}
@Override
- public void nodeHeartbeat(Text hostname, int port) throws IOException {
- updateHeartbeatInfo(hostname.toString(), port);
+ public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
+ updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port);
// No need to propagate to this to the responder
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 0581681..ece31ed 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -1,5 +1,5 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: src/protobuf/LlapDaemonProtocol.proto
+// source: LlapDaemonProtocol.proto
package org.apache.hadoop.hive.llap.daemon.rpc;
@@ -11297,6 +11297,21 @@ public final class LlapDaemonProtocolProtos {
* <code>optional .SubmissionStateProto submission_state = 1;</code>
*/
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState();
+
+ // optional string unique_node_id = 2;
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ boolean hasUniqueNodeId();
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ java.lang.String getUniqueNodeId();
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ com.google.protobuf.ByteString
+ getUniqueNodeIdBytes();
}
/**
* Protobuf type {@code SubmitWorkResponseProto}
@@ -11360,6 +11375,11 @@ public final class LlapDaemonProtocolProtos {
}
break;
}
+ case 18: {
+ bitField0_ |= 0x00000002;
+ uniqueNodeId_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -11416,8 +11436,52 @@ public final class LlapDaemonProtocolProtos {
return submissionState_;
}
+ // optional string unique_node_id = 2;
+ public static final int UNIQUE_NODE_ID_FIELD_NUMBER = 2;
+ private java.lang.Object uniqueNodeId_;
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public boolean hasUniqueNodeId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public java.lang.String getUniqueNodeId() {
+ java.lang.Object ref = uniqueNodeId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ uniqueNodeId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getUniqueNodeIdBytes() {
+ java.lang.Object ref = uniqueNodeId_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ uniqueNodeId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+ uniqueNodeId_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -11434,6 +11498,9 @@ public final class LlapDaemonProtocolProtos {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeEnum(1, submissionState_.getNumber());
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getUniqueNodeIdBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -11447,6 +11514,10 @@ public final class LlapDaemonProtocolProtos {
size += com.google.protobuf.CodedOutputStream
.computeEnumSize(1, submissionState_.getNumber());
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, getUniqueNodeIdBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -11475,6 +11546,11 @@ public final class LlapDaemonProtocolProtos {
result = result &&
(getSubmissionState() == other.getSubmissionState());
}
+ result = result && (hasUniqueNodeId() == other.hasUniqueNodeId());
+ if (hasUniqueNodeId()) {
+ result = result && getUniqueNodeId()
+ .equals(other.getUniqueNodeId());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -11492,6 +11568,10 @@ public final class LlapDaemonProtocolProtos {
hash = (37 * hash) + SUBMISSION_STATE_FIELD_NUMBER;
hash = (53 * hash) + hashEnum(getSubmissionState());
}
+ if (hasUniqueNodeId()) {
+ hash = (37 * hash) + UNIQUE_NODE_ID_FIELD_NUMBER;
+ hash = (53 * hash) + getUniqueNodeId().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -11603,6 +11683,8 @@ public final class LlapDaemonProtocolProtos {
super.clear();
submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
bitField0_ = (bitField0_ & ~0x00000001);
+ uniqueNodeId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@@ -11635,6 +11717,10 @@ public final class LlapDaemonProtocolProtos {
to_bitField0_ |= 0x00000001;
}
result.submissionState_ = submissionState_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.uniqueNodeId_ = uniqueNodeId_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -11654,6 +11740,11 @@ public final class LlapDaemonProtocolProtos {
if (other.hasSubmissionState()) {
setSubmissionState(other.getSubmissionState());
}
+ if (other.hasUniqueNodeId()) {
+ bitField0_ |= 0x00000002;
+ uniqueNodeId_ = other.uniqueNodeId_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -11717,6 +11808,80 @@ public final class LlapDaemonProtocolProtos {
return this;
}
+ // optional string unique_node_id = 2;
+ private java.lang.Object uniqueNodeId_ = "";
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public boolean hasUniqueNodeId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public java.lang.String getUniqueNodeId() {
+ java.lang.Object ref = uniqueNodeId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ uniqueNodeId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public com.google.protobuf.ByteString
+ getUniqueNodeIdBytes() {
+ java.lang.Object ref = uniqueNodeId_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ uniqueNodeId_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public Builder setUniqueNodeId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ uniqueNodeId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public Builder clearUniqueNodeId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ uniqueNodeId_ = getDefaultInstance().getUniqueNodeId();
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional string unique_node_id = 2;</code>
+ */
+ public Builder setUniqueNodeIdBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ uniqueNodeId_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:SubmitWorkResponseProto)
}
@@ -17246,83 +17411,83 @@ public final class LlapDaemonProtocolProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n%src/protobuf/LlapDaemonProtocol.proto\"" +
- "9\n\020UserPayloadProto\022\024\n\014user_payload\030\001 \001(" +
- "\014\022\017\n\007version\030\002 \001(\005\"j\n\025EntityDescriptorPr" +
- "oto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014user_payload\030" +
- "\002 \001(\0132\021.UserPayloadProto\022\024\n\014history_text" +
- "\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025connected_verte" +
- "x_name\030\001 \001(\t\022-\n\rio_descriptor\030\002 \001(\0132\026.En" +
- "tityDescriptorProto\022\033\n\023physical_edge_cou" +
- "nt\030\003 \001(\005\"z\n\023GroupInputSpecProto\022\022\n\ngroup" +
- "_name\030\001 \001(\t\022\026\n\016group_vertices\030\002 \003(\t\0227\n\027m",
- "erged_input_descriptor\030\003 \001(\0132\026.EntityDes" +
- "criptorProto\"\245\003\n\022SignableVertexSpec\022\014\n\004u" +
- "ser\030\001 \001(\t\022\026\n\016signatureKeyId\030\002 \001(\003\022/\n\020que" +
- "ry_identifier\030\003 \001(\0132\025.QueryIdentifierPro" +
- "to\022\025\n\rhive_query_id\030\004 \001(\t\022\020\n\010dag_name\030\005 " +
- "\001(\t\022\023\n\013vertex_name\030\006 \001(\t\022\024\n\014vertex_index" +
- "\030\007 \001(\005\022\030\n\020token_identifier\030\010 \001(\t\0224\n\024proc" +
- "essor_descriptor\030\t \001(\0132\026.EntityDescripto" +
- "rProto\022!\n\013input_specs\030\n \003(\0132\014.IOSpecProt" +
- "o\022\"\n\014output_specs\030\013 \003(\0132\014.IOSpecProto\0221\n",
- "\023grouped_input_specs\030\014 \003(\0132\024.GroupInputS" +
- "pecProto\022\032\n\022vertex_parallelism\030\r \001(\005\"K\n\016" +
- "VertexOrBinary\022#\n\006vertex\030\001 \001(\0132\023.Signabl" +
- "eVertexSpec\022\024\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023Fr" +
- "agmentRuntimeInfo\022#\n\033num_self_and_upstre" +
- "am_tasks\030\001 \001(\005\022-\n%num_self_and_upstream_" +
- "completed_tasks\030\002 \001(\005\022\033\n\023within_dag_prio" +
- "rity\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030fi" +
- "rst_attempt_start_time\030\005 \001(\003\022\"\n\032current_" +
- "attempt_start_time\030\006 \001(\003\"d\n\024QueryIdentif",
- "ierProto\022\035\n\025application_id_string\030\001 \001(\t\022" +
- "\021\n\tdag_index\030\002 \001(\005\022\032\n\022app_attempt_number" +
- "\030\003 \001(\005\"l\n\013NotTezEvent\022\037\n\027input_event_pro" +
- "to_bytes\030\001 \002(\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017d" +
- "est_input_name\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n" +
- "\026SubmitWorkRequestProto\022\"\n\twork_spec\030\001 \001" +
- "(\0132\017.VertexOrBinary\022\033\n\023work_spec_signatu" +
- "re\030\002 \001(\014\022\027\n\017fragment_number\030\003 \001(\005\022\026\n\016att" +
- "empt_number\030\004 \001(\005\022\033\n\023container_id_string" +
- "\030\005 \001(\t\022\017\n\007am_host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005",
- "\022\032\n\022credentials_binary\030\010 \001(\014\0223\n\025fragment" +
- "_runtime_info\030\t \001(\0132\024.FragmentRuntimeInf" +
- "o\022\033\n\023initial_event_bytes\030\n \001(\014\022\037\n\027initia" +
- "l_event_signature\030\013 \001(\014\"J\n\027SubmitWorkRes" +
- "ponseProto\022/\n\020submission_state\030\001 \001(\0162\025.S" +
- "ubmissionStateProto\"\205\001\n\036SourceStateUpdat" +
- "edRequestProto\022/\n\020query_identifier\030\001 \001(\013" +
- "2\025.QueryIdentifierProto\022\020\n\010src_name\030\002 \001(" +
- "\t\022 \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037S" +
- "ourceStateUpdatedResponseProto\"e\n\031QueryC",
- "ompleteRequestProto\022/\n\020query_identifier\030" +
- "\001 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete_d" +
- "elay\030\002 \001(\003:\0010\"\034\n\032QueryCompleteResponsePr" +
- "oto\"t\n\035TerminateFragmentRequestProto\022/\n\020" +
- "query_identifier\030\001 \001(\0132\025.QueryIdentifier" +
- "Proto\022\"\n\032fragment_identifier_string\030\002 \001(" +
- "\t\" \n\036TerminateFragmentResponseProto\"&\n\024G" +
- "etTokenRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025G" +
- "etTokenResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033L" +
- "lapOutputSocketInitMessage\022\023\n\013fragment_i",
- "d\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020SourceStatePro" +
- "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Su" +
- "bmissionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJE" +
- "CTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemon" +
- "Protocol\022?\n\nsubmitWork\022\027.SubmitWorkReque" +
- "stProto\032\030.SubmitWorkResponseProto\022W\n\022sou" +
- "rceStateUpdated\022\037.SourceStateUpdatedRequ" +
- "estProto\032 .SourceStateUpdatedResponsePro" +
- "to\022H\n\rqueryComplete\022\032.QueryCompleteReque" +
- "stProto\032\033.QueryCompleteResponseProto\022T\n\021",
- "terminateFragment\022\036.TerminateFragmentReq" +
- "uestProto\032\037.TerminateFragmentResponsePro" +
- "to2]\n\026LlapManagementProtocol\022C\n\022getDeleg" +
- "ationToken\022\025.GetTokenRequestProto\032\026.GetT" +
- "okenResponseProtoBH\n&org.apache.hadoop.h" +
- "ive.llap.daemon.rpcB\030LlapDaemonProtocolP" +
- "rotos\210\001\001\240\001\001"
+ "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" +
+ "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" +
+ "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" +
+ "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" +
+ "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" +
+ "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" +
+ "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" +
+ "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" +
+ "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" +
+ "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d",
+ "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" +
+ "\245\003\n\022SignableVertexSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016" +
+ "signatureKeyId\030\002 \001(\003\022/\n\020query_identifier" +
+ "\030\003 \001(\0132\025.QueryIdentifierProto\022\025\n\rhive_qu" +
+ "ery_id\030\004 \001(\t\022\020\n\010dag_name\030\005 \001(\t\022\023\n\013vertex" +
+ "_name\030\006 \001(\t\022\024\n\014vertex_index\030\007 \001(\005\022\030\n\020tok" +
+ "en_identifier\030\010 \001(\t\0224\n\024processor_descrip" +
+ "tor\030\t \001(\0132\026.EntityDescriptorProto\022!\n\013inp" +
+ "ut_specs\030\n \003(\0132\014.IOSpecProto\022\"\n\014output_s" +
+ "pecs\030\013 \003(\0132\014.IOSpecProto\0221\n\023grouped_inpu",
+ "t_specs\030\014 \003(\0132\024.GroupInputSpecProto\022\032\n\022v" +
+ "ertex_parallelism\030\r \001(\005\"K\n\016VertexOrBinar" +
+ "y\022#\n\006vertex\030\001 \001(\0132\023.SignableVertexSpec\022\024" +
+ "\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023FragmentRuntime" +
+ "Info\022#\n\033num_self_and_upstream_tasks\030\001 \001(" +
+ "\005\022-\n%num_self_and_upstream_completed_tas" +
+ "ks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001(\005\022\026\n" +
+ "\016dag_start_time\030\004 \001(\003\022 \n\030first_attempt_s" +
+ "tart_time\030\005 \001(\003\022\"\n\032current_attempt_start" +
+ "_time\030\006 \001(\003\"d\n\024QueryIdentifierProto\022\035\n\025a",
+ "pplication_id_string\030\001 \001(\t\022\021\n\tdag_index\030" +
+ "\002 \001(\005\022\032\n\022app_attempt_number\030\003 \001(\005\"l\n\013Not" +
+ "TezEvent\022\037\n\027input_event_proto_bytes\030\001 \002(" +
+ "\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017dest_input_nam" +
+ "e\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n\026SubmitWorkRe" +
+ "questProto\022\"\n\twork_spec\030\001 \001(\0132\017.VertexOr" +
+ "Binary\022\033\n\023work_spec_signature\030\002 \001(\014\022\027\n\017f" +
+ "ragment_number\030\003 \001(\005\022\026\n\016attempt_number\030\004" +
+ " \001(\005\022\033\n\023container_id_string\030\005 \001(\t\022\017\n\007am_" +
+ "host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005\022\032\n\022credentia",
+ "ls_binary\030\010 \001(\014\0223\n\025fragment_runtime_info" +
+ "\030\t \001(\0132\024.FragmentRuntimeInfo\022\033\n\023initial_" +
+ "event_bytes\030\n \001(\014\022\037\n\027initial_event_signa" +
+ "ture\030\013 \001(\014\"b\n\027SubmitWorkResponseProto\022/\n" +
+ "\020submission_state\030\001 \001(\0162\025.SubmissionStat" +
+ "eProto\022\026\n\016unique_node_id\030\002 \001(\t\"\205\001\n\036Sourc" +
+ "eStateUpdatedRequestProto\022/\n\020query_ident" +
+ "ifier\030\001 \001(\0132\025.QueryIdentifierProto\022\020\n\010sr" +
+ "c_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStat" +
+ "eProto\"!\n\037SourceStateUpdatedResponseProt",
+ "o\"e\n\031QueryCompleteRequestProto\022/\n\020query_" +
+ "identifier\030\001 \001(\0132\025.QueryIdentifierProto\022" +
+ "\027\n\014delete_delay\030\002 \001(\003:\0010\"\034\n\032QueryComplet" +
+ "eResponseProto\"t\n\035TerminateFragmentReque" +
+ "stProto\022/\n\020query_identifier\030\001 \001(\0132\025.Quer" +
+ "yIdentifierProto\022\"\n\032fragment_identifier_" +
+ "string\030\002 \001(\t\" \n\036TerminateFragmentRespons" +
+ "eProto\"&\n\024GetTokenRequestProto\022\016\n\006app_id" +
+ "\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token" +
+ "\030\001 \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n",
+ "\013fragment_id\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020Sou" +
+ "rceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNN" +
+ "ING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTE" +
+ "D\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n" +
+ "\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Subm" +
+ "itWorkRequestProto\032\030.SubmitWorkResponseP" +
+ "roto\022W\n\022sourceStateUpdated\022\037.SourceState" +
+ "UpdatedRequestProto\032 .SourceStateUpdated" +
+ "ResponseProto\022H\n\rqueryComplete\022\032.QueryCo" +
+ "mpleteRequestProto\032\033.QueryCompleteRespon",
+ "seProto\022T\n\021terminateFragment\022\036.Terminate" +
+ "FragmentRequestProto\032\037.TerminateFragment" +
+ "ResponseProto2]\n\026LlapManagementProtocol\022" +
+ "C\n\022getDelegationToken\022\025.GetTokenRequestP" +
+ "roto\032\026.GetTokenResponseProtoBH\n&org.apac" +
+ "he.hadoop.hive.llap.daemon.rpcB\030LlapDaem" +
+ "onProtocolProtos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17394,7 +17559,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_SubmitWorkResponseProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SubmitWorkResponseProto_descriptor,
- new java.lang.String[] { "SubmissionState", });
+ new java.lang.String[] { "SubmissionState", "UniqueNodeId", });
internal_static_SourceStateUpdatedRequestProto_descriptor =
getDescriptor().getMessageTypes().get(11);
internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
index ea47330..89b0152 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
@@ -14,21 +14,24 @@
package org.apache.hadoop.hive.llap;
+import java.util.UUID;
+
public class DaemonId {
private final String userName;
private final String clusterName;
private final String appId;
private final String hostName;
- private final long startTime;
+ private final long startTimeMs;
+ private final String uuidString;
- public DaemonId(String userName, String clusterName, String hostName, String appId,
- long startTime) {
+ public DaemonId(
+ String userName, String clusterName, String hostName, String appId, long startTime) {
this.userName = userName;
this.clusterName = clusterName;
this.appId = appId;
this.hostName = hostName;
- this.startTime = startTime;
- // TODO: we could also get an unique number per daemon.
+ this.startTimeMs = startTime;
+ this.uuidString = UUID.randomUUID().toString();
}
public String getClusterString() {
@@ -45,4 +48,8 @@ public class DaemonId {
public String getApplicationId() {
return appId;
}
+
+ public String getUniqueNodeIdInCluster() {
+ return hostName + "_" + startTimeMs + "_" + uuidString;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index 9549567..dbfe54b 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -35,7 +35,7 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException;
- public void nodeHeartbeat(Text hostname, int port) throws IOException;
+ public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException;
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 2e74c18..3a3a2b8 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -140,6 +140,7 @@ enum SubmissionStateProto {
message SubmitWorkResponseProto {
optional SubmissionStateProto submission_state = 1;
+ optional string unique_node_id = 2;
}
message SourceStateUpdatedRequestProto {
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 04c28cb..93f8073 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -41,6 +41,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
@@ -85,7 +86,7 @@ public class AMReporter extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);
- private volatile LlapNodeId nodeId;
+ private LlapNodeId nodeId;
private final QueryFailedHandler queryFailedHandler;
private final Configuration conf;
private final ListeningExecutorService queueLookupExecutor;
@@ -101,13 +102,15 @@ public class AMReporter extends AbstractService {
// messages like taskKilled, etc.
private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
volatile ListenableFuture<Void> queueLookupFuture;
+ private final DaemonId daemonId;
public AMReporter(AtomicReference<InetSocketAddress> localAddress,
- QueryFailedHandler queryFailedHandler, Configuration conf) {
+ QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) {
super(AMReporter.class.getName());
this.localAddress = localAddress;
this.queryFailedHandler = queryFailedHandler;
this.conf = conf;
+ this.daemonId = daemonId;
ExecutorService rawExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
this.executor = MoreExecutors.listeningDecorator(rawExecutor);
@@ -154,8 +157,9 @@ public class AMReporter extends AbstractService {
}
}
});
+ // TODO: why is this needed? we could just save the host and port?
nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort());
- LOG.info("AMReporter running with NodeId: {}", nodeId);
+ LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", daemonId, nodeId);
}
@Override
@@ -336,7 +340,7 @@ public class AMReporter extends AbstractService {
LOG.trace("NodeHeartbeat to: " + amNodeInfo);
}
amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
- nodeId.getPort());
+ new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort());
} catch (IOException e) {
QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
amNodeInfo.setAmFailed(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 91a321d..1346050 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -102,6 +102,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private final HadoopShim tezHadoopShim;
private final LlapSignerImpl signer;
private final String clusterId;
+ private final DaemonId daemonId;
private final UgiFactory fsUgiFactory;
public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
@@ -120,6 +121,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.fsUgiFactory = fsUgiFactory;
this.clusterId = daemonId.getClusterString();
+ this.daemonId = daemonId;
this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
addIfService(queryTracker);
String waitQueueSchedulerClassName = HiveConf.getVar(
@@ -262,8 +264,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
NDC.clear();
}
- responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()));
- return responseBuilder.build();
+ return responseBuilder.setUniqueNodeId(daemonId.getUniqueNodeIdInCluster())
+ .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()))
+ .build();
}
private SignableVertexSpec extractVertexSpec(SubmitWorkRequestProto request,
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 752e6ee..d90b156 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -239,7 +239,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
" sessionId: " + sessionId);
- this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
+ this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf, daemonId);
SecretManager sm = null;
if (UserGroupInformation.isSecurityEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 0deebf9..c692581 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -310,8 +310,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// Have to register this up front right now. Otherwise, it's possible for the task to start
// sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
- getContext()
- .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+ getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.sendSubmitWork(requestProto, host, port,
new LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@Override
@@ -331,6 +330,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// This should never happen as server always returns a valid status on success
throw new RuntimeException("SubmissionState in response is expected!");
}
+ if (response.hasUniqueNodeId()) {
+ entityTracker.registerTaskSubmittedToNode(
+ taskSpec.getTaskAttemptID(), response.getUniqueNodeId());
+ }
LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
}
@@ -562,7 +565,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
- void nodePinged(String hostname, int port) {
+ void nodePinged(String hostname, String uniqueId, int port) {
+ // TODO: do we ever need the port? we could just do away with nodeId altogether.
LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
registerPingingNode(nodeId);
BiMap<ContainerId, TezTaskAttemptID> biMap =
@@ -570,8 +574,19 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
if (biMap != null) {
synchronized (biMap) {
for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
- getContext().taskAlive(entry.getValue());
- getContext().containerAlive(entry.getKey());
+ // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID,
+ // or at least (in this case) track the latest unique ID for LlapNode and retry all
+ // older-node tasks proactively. For now let the heartbeats fail them.
+ TezTaskAttemptID attemptId = entry.getValue();
+ String taskNodeId = entityTracker.getUniqueNodeId(attemptId);
+ // Unique ID is registered based on Submit response. Theoretically, we could get a ping
+ // when the task is valid but we haven't stored the unique ID yet, so taskNodeId is null.
+ // However, the next heartbeat(s) should get the value eventually and mark task as alive.
+ // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
+ if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
+ getContext().taskAlive(entry.getValue());
+ getContext().containerAlive(entry.getKey());
+ }
}
}
} else {
@@ -664,10 +679,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
@Override
- public void nodeHeartbeat(Text hostname, int port) throws IOException {
- nodePinged(hostname.toString(), port);
+ public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
+ nodePinged(hostname.toString(), uniqueId.toString(), port);
if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
+ LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
}
}
@@ -697,12 +712,17 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
*/
@VisibleForTesting
static final class EntityTracker {
+ // TODO: need the description of how these maps are kept consistent.
@VisibleForTesting
final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>();
@VisibleForTesting
final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>();
@VisibleForTesting
final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>();
+ // TODO: we currently put task info everywhere before we submit it and know the "real" node id.
+ // Therefore, we are going to store this separately. Ideally, we should roll uniqueness
+ // into LlapNodeId. We get node info from registry; that should (or can) include it.
+ private final ConcurrentMap<TezTaskAttemptID, String> uniqueNodeMap = new ConcurrentHashMap<>();
void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) {
if (LOG.isDebugEnabled()) {
@@ -726,7 +746,20 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
nodeMap.putIfAbsent(llapNodeId, usedInstance);
}
+ public String getUniqueNodeId(TezTaskAttemptID attemptId) {
+ return uniqueNodeMap.get(attemptId);
+ }
+
+ public void registerTaskSubmittedToNode(
+ TezTaskAttemptID taskAttemptID, String uniqueNodeId) {
+ String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
+ if (prev != null) {
+ LOG.warn("Replaced the unique node mapping for task from " + prev + " to " + uniqueNodeId);
+ }
+ }
+
void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+ uniqueNodeMap.remove(attemptId);
LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId);
if (llapNodeId == null) {
// Possible since either container / task can be unregistered.
@@ -820,6 +853,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
// Remove the container mapping
if (matched != null) {
attemptToNodeMap.remove(matched);
+ uniqueNodeMap.remove(matched);
}
}
@@ -834,8 +868,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
* @return
*/
BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
- BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(llapNodeId);
- return biMap;
+ return nodeMap.get(llapNodeId);
}
}