You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/05/20 07:56:36 UTC

hive git commit: HIVE-10763. LLAP: Provide current attempt start time for wait queue ordering. (Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap ece61d033 -> 95a959ff5


HIVE-10763. LLAP: Provide current attempt start time for wait queue ordering. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/95a959ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/95a959ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/95a959ff

Branch: refs/heads/llap
Commit: 95a959ff5bd429cba062a0259a975c9d12a85206
Parents: ece61d0
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 19 22:55:58 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 19 22:55:58 2015 -0700

----------------------------------------------------------------------
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 160 +++++++++++++++----
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   1 +
 .../llap/daemon/impl/TaskRunnerCallable.java    |   5 +
 .../tezplugins/helpers/SourceStateTracker.java  |   1 +
 .../src/protobuf/LlapDaemonProtocol.proto       |   1 +
 5 files changed, 133 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 8748151..d378955 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -5587,6 +5587,16 @@ public final class LlapDaemonProtocolProtos {
      * <code>optional int64 first_attempt_start_time = 5;</code>
      */
     long getFirstAttemptStartTime();
+
+    // optional int64 current_attempt_start_time = 6;
+    /**
+     * <code>optional int64 current_attempt_start_time = 6;</code>
+     */
+    boolean hasCurrentAttemptStartTime();
+    /**
+     * <code>optional int64 current_attempt_start_time = 6;</code>
+     */
+    long getCurrentAttemptStartTime();
   }
   /**
    * Protobuf type {@code FragmentRuntimeInfo}
@@ -5664,6 +5674,11 @@ public final class LlapDaemonProtocolProtos {
               firstAttemptStartTime_ = input.readInt64();
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              currentAttemptStartTime_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -5784,12 +5799,29 @@ public final class LlapDaemonProtocolProtos {
       return firstAttemptStartTime_;
     }
 
+    // optional int64 current_attempt_start_time = 6;
+    public static final int CURRENT_ATTEMPT_START_TIME_FIELD_NUMBER = 6;
+    private long currentAttemptStartTime_;
+    /**
+     * <code>optional int64 current_attempt_start_time = 6;</code>
+     */
+    public boolean hasCurrentAttemptStartTime() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int64 current_attempt_start_time = 6;</code>
+     */
+    public long getCurrentAttemptStartTime() {
+      return currentAttemptStartTime_;
+    }
+
     private void initFields() {
       numSelfAndUpstreamTasks_ = 0;
       numSelfAndUpstreamCompletedTasks_ = 0;
       withinDagPriority_ = 0;
       dagStartTime_ = 0L;
       firstAttemptStartTime_ = 0L;
+      currentAttemptStartTime_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5818,6 +5850,9 @@ public final class LlapDaemonProtocolProtos {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeInt64(5, firstAttemptStartTime_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(6, currentAttemptStartTime_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -5847,6 +5882,10 @@ public final class LlapDaemonProtocolProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(5, firstAttemptStartTime_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(6, currentAttemptStartTime_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5895,6 +5934,11 @@ public final class LlapDaemonProtocolProtos {
         result = result && (getFirstAttemptStartTime()
             == other.getFirstAttemptStartTime());
       }
+      result = result && (hasCurrentAttemptStartTime() == other.hasCurrentAttemptStartTime());
+      if (hasCurrentAttemptStartTime()) {
+        result = result && (getCurrentAttemptStartTime()
+            == other.getCurrentAttemptStartTime());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5928,6 +5972,10 @@ public final class LlapDaemonProtocolProtos {
         hash = (37 * hash) + FIRST_ATTEMPT_START_TIME_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getFirstAttemptStartTime());
       }
+      if (hasCurrentAttemptStartTime()) {
+        hash = (37 * hash) + CURRENT_ATTEMPT_START_TIME_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getCurrentAttemptStartTime());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -6047,6 +6095,8 @@ public final class LlapDaemonProtocolProtos {
         bitField0_ = (bitField0_ & ~0x00000008);
         firstAttemptStartTime_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000010);
+        currentAttemptStartTime_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -6095,6 +6145,10 @@ public final class LlapDaemonProtocolProtos {
           to_bitField0_ |= 0x00000010;
         }
         result.firstAttemptStartTime_ = firstAttemptStartTime_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.currentAttemptStartTime_ = currentAttemptStartTime_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6126,6 +6180,9 @@ public final class LlapDaemonProtocolProtos {
         if (other.hasFirstAttemptStartTime()) {
           setFirstAttemptStartTime(other.getFirstAttemptStartTime());
         }
+        if (other.hasCurrentAttemptStartTime()) {
+          setCurrentAttemptStartTime(other.getCurrentAttemptStartTime());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6318,6 +6375,39 @@ public final class LlapDaemonProtocolProtos {
         return this;
       }
 
+      // optional int64 current_attempt_start_time = 6;
+      private long currentAttemptStartTime_ ;
+      /**
+       * <code>optional int64 current_attempt_start_time = 6;</code>
+       */
+      public boolean hasCurrentAttemptStartTime() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional int64 current_attempt_start_time = 6;</code>
+       */
+      public long getCurrentAttemptStartTime() {
+        return currentAttemptStartTime_;
+      }
+      /**
+       * <code>optional int64 current_attempt_start_time = 6;</code>
+       */
+      public Builder setCurrentAttemptStartTime(long value) {
+        bitField0_ |= 0x00000020;
+        currentAttemptStartTime_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 current_attempt_start_time = 6;</code>
+       */
+      public Builder clearCurrentAttemptStartTime() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        currentAttemptStartTime_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:FragmentRuntimeInfo)
     }
 
@@ -12714,44 +12804,44 @@ public final class LlapDaemonProtocolProtos {
       "\003(\0132\014.IOSpecProto\0221\n\023grouped_input_specs" +
       "\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vertex_p" +
       "arallelism\030\010 \001(\005\022\027\n\017fragment_number\030\t \001(" +
-      "\005\022\026\n\016attempt_number\030\n \001(\005\"\300\001\n\023FragmentRu",
+      "\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023FragmentRu",
       "ntimeInfo\022#\n\033num_self_and_upstream_tasks" +
       "\030\001 \001(\005\022-\n%num_self_and_upstream_complete" +
       "d_tasks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001" +
       "(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_atte" +
-      "mpt_start_time\030\005 \001(\003\"\266\002\n\026SubmitWorkReque" +
-      "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" +
-      "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" +
-      "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" +
-      " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str" +
-      "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n",
-      "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" +
-      "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" +
-      "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" +
-      "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" +
-      "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" +
-      "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" +
-      "pdatedResponseProto\"X\n\031QueryCompleteRequ" +
-      "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" +
-      " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo" +
-      "mpleteResponseProto\"\245\001\n\035TerminateFragmen",
-      "tRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_n" +
-      "ame\030\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n" +
-      "\013vertex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 " +
-      "\001(\005\022\026\n\016attempt_number\030\006 \001(\005\" \n\036Terminate" +
-      "FragmentResponseProto*2\n\020SourceStateProt" +
-      "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022Ll" +
-      "apDaemonProtocol\022?\n\nsubmitWork\022\027.SubmitW" +
-      "orkRequestProto\032\030.SubmitWorkResponseProt" +
-      "o\022W\n\022sourceStateUpdated\022\037.SourceStateUpd" +
-      "atedRequestProto\032 .SourceStateUpdatedRes",
-      "ponseProto\022H\n\rqueryComplete\022\032.QueryCompl" +
-      "eteRequestProto\032\033.QueryCompleteResponseP" +
-      "roto\022T\n\021terminateFragment\022\036.TerminateFra" +
-      "gmentRequestProto\032\037.TerminateFragmentRes" +
-      "ponseProtoBH\n&org.apache.hadoop.hive.lla" +
-      "p.daemon.rpcB\030LlapDaemonProtocolProtos\210\001" +
-      "\001\240\001\001"
+      "mpt_start_time\030\005 \001(\003\022\"\n\032current_attempt_" +
+      "start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkRequestPr" +
+      "oto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_h" +
+      "ost\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_iden" +
+      "tifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014" +
+      "\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_string\030",
+      "\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfra" +
+      "gment_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025" +
+      "fragment_runtime_info\030\n \001(\0132\024.FragmentRu" +
+      "ntimeInfo\"\031\n\027SubmitWorkResponseProto\"f\n\036" +
+      "SourceStateUpdatedRequestProto\022\020\n\010dag_na" +
+      "me\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(" +
+      "\0162\021.SourceStateProto\"!\n\037SourceStateUpdat" +
+      "edResponseProto\"X\n\031QueryCompleteRequestP" +
+      "roto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t" +
+      "\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryComple",
+      "teResponseProto\"\245\001\n\035TerminateFragmentReq" +
+      "uestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030" +
+      "\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n\013ver" +
+      "tex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 \001(\005\022" +
+      "\026\n\016attempt_number\030\006 \001(\005\" \n\036TerminateFrag" +
+      "mentResponseProto*2\n\020SourceStateProto\022\017\n" +
+      "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022LlapDa" +
+      "emonProtocol\022?\n\nsubmitWork\022\027.SubmitWorkR" +
+      "equestProto\032\030.SubmitWorkResponseProto\022W\n" +
+      "\022sourceStateUpdated\022\037.SourceStateUpdated",
+      "RequestProto\032 .SourceStateUpdatedRespons" +
+      "eProto\022H\n\rqueryComplete\022\032.QueryCompleteR" +
+      "equestProto\032\033.QueryCompleteResponseProto" +
+      "\022T\n\021terminateFragment\022\036.TerminateFragmen" +
+      "tRequestProto\032\037.TerminateFragmentRespons" +
+      "eProtoBH\n&org.apache.hadoop.hive.llap.da" +
+      "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12793,7 +12883,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_FragmentRuntimeInfo_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_FragmentRuntimeInfo_descriptor,
-              new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", });
+              new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", "CurrentAttemptStartTime", });
           internal_static_SubmitWorkRequestProto_descriptor =
             getDescriptor().getMessageTypes().get(6);
           internal_static_SubmitWorkRequestProto_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index a208bdd..d594d6a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -282,6 +282,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     sb.append(", completedTaskCount=").append(fragmentRuntimeInfo.getNumSelfAndUpstreamCompletedTasks());
     sb.append(", dagStartTime=").append(fragmentRuntimeInfo.getDagStartTime());
     sb.append(", firstAttemptStartTime=").append(fragmentRuntimeInfo.getFirstAttemptStartTime());
+    sb.append(", currentAttemptStartTime=").append(fragmentRuntimeInfo.getCurrentAttemptStartTime());
     sb.append("}");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 166dac5..2ea39b7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -442,4 +442,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   public long getFirstAttemptStartTime() {
     return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
   }
+
+  public long getCurrentAttemptStartTime() {
+    return request.getFragmentRuntimeInfo().getCurrentAttemptStartTime();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
index d83d62b..40b317d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java
@@ -160,6 +160,7 @@ public class SourceStateTracker {
     builder.setDagStartTime(taskCommunicatorContext.getDagStartTime());
     builder.setWithinDagPriority(priority);
     builder.setFirstAttemptStartTime(taskCommunicatorContext.getFirstAttemptStartTime(vertexName, fragmentNumber));
+    builder.setCurrentAttemptStartTime(System.currentTimeMillis());
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/95a959ff/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index e098e87..d8fd882 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -66,6 +66,7 @@ message FragmentRuntimeInfo {
   optional int32 within_dag_priority = 3;
   optional int64 dag_start_time = 4;
   optional int64 first_attempt_start_time = 5;
+  optional int64 current_attempt_start_time = 6;
 }
 
 enum SourceStateProto {