You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/05/20 07:56:36 UTC
hive git commit: HIVE-10763. LLAP: Provide current attempt start time
for wait queue ordering. (Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/llap ece61d033 -> 95a959ff5
HIVE-10763. LLAP: Provide current attempt start time for wait queue ordering. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95a959ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95a959ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95a959ff
Branch: refs/heads/llap
Commit: 95a959ff5bd429cba062a0259a975c9d12a85206
Parents: ece61d0
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 19 22:55:58 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 19 22:55:58 2015 -0700
----------------------------------------------------------------------
.../daemon/rpc/LlapDaemonProtocolProtos.java | 160 +++++++++++++++----
.../llap/daemon/impl/ContainerRunnerImpl.java | 1 +
.../llap/daemon/impl/TaskRunnerCallable.java | 5 +
.../tezplugins/helpers/SourceStateTracker.java | 1 +
.../src/protobuf/LlapDaemonProtocol.proto | 1 +
5 files changed, 133 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 8748151..d378955 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -5587,6 +5587,16 @@ public final class LlapDaemonProtocolProtos {
* <code>optional int64 first_attempt_start_time = 5;</code>
*/
long getFirstAttemptStartTime();
+
+ // optional int64 current_attempt_start_time = 6;
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ boolean hasCurrentAttemptStartTime();
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ long getCurrentAttemptStartTime();
}
/**
* Protobuf type {@code FragmentRuntimeInfo}
@@ -5664,6 +5674,11 @@ public final class LlapDaemonProtocolProtos {
firstAttemptStartTime_ = input.readInt64();
break;
}
+ case 48: {
+ bitField0_ |= 0x00000020;
+ currentAttemptStartTime_ = input.readInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5784,12 +5799,29 @@ public final class LlapDaemonProtocolProtos {
return firstAttemptStartTime_;
}
+ // optional int64 current_attempt_start_time = 6;
+ public static final int CURRENT_ATTEMPT_START_TIME_FIELD_NUMBER = 6;
+ private long currentAttemptStartTime_;
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ public boolean hasCurrentAttemptStartTime() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ public long getCurrentAttemptStartTime() {
+ return currentAttemptStartTime_;
+ }
+
private void initFields() {
numSelfAndUpstreamTasks_ = 0;
numSelfAndUpstreamCompletedTasks_ = 0;
withinDagPriority_ = 0;
dagStartTime_ = 0L;
firstAttemptStartTime_ = 0L;
+ currentAttemptStartTime_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5818,6 +5850,9 @@ public final class LlapDaemonProtocolProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeInt64(5, firstAttemptStartTime_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeInt64(6, currentAttemptStartTime_);
+ }
getUnknownFields().writeTo(output);
}
@@ -5847,6 +5882,10 @@ public final class LlapDaemonProtocolProtos {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(5, firstAttemptStartTime_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(6, currentAttemptStartTime_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5895,6 +5934,11 @@ public final class LlapDaemonProtocolProtos {
result = result && (getFirstAttemptStartTime()
== other.getFirstAttemptStartTime());
}
+ result = result && (hasCurrentAttemptStartTime() == other.hasCurrentAttemptStartTime());
+ if (hasCurrentAttemptStartTime()) {
+ result = result && (getCurrentAttemptStartTime()
+ == other.getCurrentAttemptStartTime());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -5928,6 +5972,10 @@ public final class LlapDaemonProtocolProtos {
hash = (37 * hash) + FIRST_ATTEMPT_START_TIME_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getFirstAttemptStartTime());
}
+ if (hasCurrentAttemptStartTime()) {
+ hash = (37 * hash) + CURRENT_ATTEMPT_START_TIME_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getCurrentAttemptStartTime());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -6047,6 +6095,8 @@ public final class LlapDaemonProtocolProtos {
bitField0_ = (bitField0_ & ~0x00000008);
firstAttemptStartTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000010);
+ currentAttemptStartTime_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -6095,6 +6145,10 @@ public final class LlapDaemonProtocolProtos {
to_bitField0_ |= 0x00000010;
}
result.firstAttemptStartTime_ = firstAttemptStartTime_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.currentAttemptStartTime_ = currentAttemptStartTime_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -6126,6 +6180,9 @@ public final class LlapDaemonProtocolProtos {
if (other.hasFirstAttemptStartTime()) {
setFirstAttemptStartTime(other.getFirstAttemptStartTime());
}
+ if (other.hasCurrentAttemptStartTime()) {
+ setCurrentAttemptStartTime(other.getCurrentAttemptStartTime());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6318,6 +6375,39 @@ public final class LlapDaemonProtocolProtos {
return this;
}
+ // optional int64 current_attempt_start_time = 6;
+ private long currentAttemptStartTime_ ;
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ public boolean hasCurrentAttemptStartTime() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ public long getCurrentAttemptStartTime() {
+ return currentAttemptStartTime_;
+ }
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ public Builder setCurrentAttemptStartTime(long value) {
+ bitField0_ |= 0x00000020;
+ currentAttemptStartTime_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int64 current_attempt_start_time = 6;</code>
+ */
+ public Builder clearCurrentAttemptStartTime() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ currentAttemptStartTime_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:FragmentRuntimeInfo)
}
@@ -12714,44 +12804,44 @@ public final class LlapDaemonProtocolProtos {
"\003(\0132\014.IOSpecProto\0221\n\023grouped_input_specs" +
"\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vertex_p" +
"arallelism\030\010 \001(\005\022\027\n\017fragment_number\030\t \001(" +
- "\005\022\026\n\016attempt_number\030\n \001(\005\"\300\001\n\023FragmentRu",
+ "\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023FragmentRu",
"ntimeInfo\022#\n\033num_self_and_upstream_tasks" +
"\030\001 \001(\005\022-\n%num_self_and_upstream_complete" +
"d_tasks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001" +
"(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_atte" +
- "mpt_start_time\030\005 \001(\003\"\266\002\n\026SubmitWorkReque" +
- "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" +
- "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" +
- "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" +
- " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str" +
- "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n",
- "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" +
- "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" +
- "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" +
- "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" +
- "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" +
- "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" +
- "pdatedResponseProto\"X\n\031QueryCompleteRequ" +
- "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" +
- " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo" +
- "mpleteResponseProto\"\245\001\n\035TerminateFragmen",
- "tRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_n" +
- "ame\030\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n" +
- "\013vertex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 " +
- "\001(\005\022\026\n\016attempt_number\030\006 \001(\005\" \n\036Terminate" +
- "FragmentResponseProto*2\n\020SourceStateProt" +
- "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022Ll" +
- "apDaemonProtocol\022?\n\nsubmitWork\022\027.SubmitW" +
- "orkRequestProto\032\030.SubmitWorkResponseProt" +
- "o\022W\n\022sourceStateUpdated\022\037.SourceStateUpd" +
- "atedRequestProto\032 .SourceStateUpdatedRes",
- "ponseProto\022H\n\rqueryComplete\022\032.QueryCompl" +
- "eteRequestProto\032\033.QueryCompleteResponseP" +
- "roto\022T\n\021terminateFragment\022\036.TerminateFra" +
- "gmentRequestProto\032\037.TerminateFragmentRes" +
- "ponseProtoBH\n&org.apache.hadoop.hive.lla" +
- "p.daemon.rpcB\030LlapDaemonProtocolProtos\210\001" +
- "\001\240\001\001"
+ "mpt_start_time\030\005 \001(\003\022\"\n\032current_attempt_" +
+ "start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkRequestPr" +
+ "oto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_h" +
+ "ost\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_iden" +
+ "tifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014" +
+ "\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_string\030",
+ "\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfra" +
+ "gment_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025" +
+ "fragment_runtime_info\030\n \001(\0132\024.FragmentRu" +
+ "ntimeInfo\"\031\n\027SubmitWorkResponseProto\"f\n\036" +
+ "SourceStateUpdatedRequestProto\022\020\n\010dag_na" +
+ "me\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(" +
+ "\0162\021.SourceStateProto\"!\n\037SourceStateUpdat" +
+ "edResponseProto\"X\n\031QueryCompleteRequestP" +
+ "roto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t" +
+ "\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryComple",
+ "teResponseProto\"\245\001\n\035TerminateFragmentReq" +
+ "uestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030" +
+ "\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n\013ver" +
+ "tex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 \001(\005\022" +
+ "\026\n\016attempt_number\030\006 \001(\005\" \n\036TerminateFrag" +
+ "mentResponseProto*2\n\020SourceStateProto\022\017\n" +
+ "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022LlapDa" +
+ "emonProtocol\022?\n\nsubmitWork\022\027.SubmitWorkR" +
+ "equestProto\032\030.SubmitWorkResponseProto\022W\n" +
+ "\022sourceStateUpdated\022\037.SourceStateUpdated",
+ "RequestProto\032 .SourceStateUpdatedRespons" +
+ "eProto\022H\n\rqueryComplete\022\032.QueryCompleteR" +
+ "equestProto\032\033.QueryCompleteResponseProto" +
+ "\022T\n\021terminateFragment\022\036.TerminateFragmen" +
+ "tRequestProto\032\037.TerminateFragmentRespons" +
+ "eProtoBH\n&org.apache.hadoop.hive.llap.da" +
+ "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12793,7 +12883,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_FragmentRuntimeInfo_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_FragmentRuntimeInfo_descriptor,
- new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", });
+ new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", "CurrentAttemptStartTime", });
internal_static_SubmitWorkRequestProto_descriptor =
getDescriptor().getMessageTypes().get(6);
internal_static_SubmitWorkRequestProto_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index a208bdd..d594d6a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -282,6 +282,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
sb.append(", completedTaskCount=").append(fragmentRuntimeInfo.getNumSelfAndUpstreamCompletedTasks());
sb.append(", dagStartTime=").append(fragmentRuntimeInfo.getDagStartTime());
sb.append(", firstAttemptStartTime=").append(fragmentRuntimeInfo.getFirstAttemptStartTime());
+ sb.append(", currentAttemptStartTime=").append(fragmentRuntimeInfo.getCurrentAttemptStartTime());
sb.append("}");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 166dac5..2ea39b7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -442,4 +442,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
public long getFirstAttemptStartTime() {
return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
}
+
+ public long getCurrentAttemptStartTime() {
+ return request.getFragmentRuntimeInfo().getCurrentAttemptStartTime();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index d83d62b..40b317d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -160,6 +160,7 @@ public class SourceStateTracker {
builder.setDagStartTime(taskCommunicatorContext.getDagStartTime());
builder.setWithinDagPriority(priority);
builder.setFirstAttemptStartTime(taskCommunicatorContext.getFirstAttemptStartTime(vertexName, fragmentNumber));
+ builder.setCurrentAttemptStartTime(System.currentTimeMillis());
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index e098e87..d8fd882 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -66,6 +66,7 @@ message FragmentRuntimeInfo {
optional int32 within_dag_priority = 3;
optional int64 dag_start_time = 4;
optional int64 first_attempt_start_time = 5;
+ optional int64 current_attempt_start_time = 6;
}
enum SourceStateProto {