You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2017/05/18 19:31:58 UTC

hive git commit: HIVE-14052. Cleanup structures when external clients use LLAP. (Siddharth Seth, reviewed by Jason Dere, Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 3671668b6 -> 1dfe101a7


HIVE-14052. Cleanup structures when external clients use LLAP.  (Siddharth Seth, reviewed by Jason Dere, Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1dfe101a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1dfe101a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1dfe101a

Branch: refs/heads/master
Commit: 1dfe101a74af59aaa47f08399be76a798682d740
Parents: 3671668
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 18 12:30:46 2017 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 18 12:30:46 2017 -0700

----------------------------------------------------------------------
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 250 +++++++++++++------
 .../src/protobuf/LlapDaemonProtocol.proto       |   1 +
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java |   9 +-
 .../hive/llap/daemon/impl/QueryTracker.java     | 198 ++++++++++++---
 .../daemon/impl/TaskExecutorTestHelpers.java    |   2 +-
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  23 +-
 6 files changed, 357 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index ece31ed..c19cf63 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -1,5 +1,5 @@
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
-// source: LlapDaemonProtocol.proto
+// source: llap-common/src/protobuf/LlapDaemonProtocol.proto
 
 package org.apache.hadoop.hive.llap.daemon.rpc;
 
@@ -3454,6 +3454,16 @@ public final class LlapDaemonProtocolProtos {
      * </pre>
      */
     int getVertexParallelism();
+
+    // optional bool is_external_submission = 14 [default = false];
+    /**
+     * <code>optional bool is_external_submission = 14 [default = false];</code>
+     */
+    boolean hasIsExternalSubmission();
+    /**
+     * <code>optional bool is_external_submission = 14 [default = false];</code>
+     */
+    boolean getIsExternalSubmission();
   }
   /**
    * Protobuf type {@code SignableVertexSpec}
@@ -3600,6 +3610,11 @@ public final class LlapDaemonProtocolProtos {
               vertexParallelism_ = input.readInt32();
               break;
             }
+            case 112: {
+              bitField0_ |= 0x00000400;
+              isExternalSubmission_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4096,6 +4111,22 @@ public final class LlapDaemonProtocolProtos {
       return vertexParallelism_;
     }
 
+    // optional bool is_external_submission = 14 [default = false];
+    public static final int IS_EXTERNAL_SUBMISSION_FIELD_NUMBER = 14;
+    private boolean isExternalSubmission_;
+    /**
+     * <code>optional bool is_external_submission = 14 [default = false];</code>
+     */
+    public boolean hasIsExternalSubmission() {
+      return ((bitField0_ & 0x00000400) == 0x00000400);
+    }
+    /**
+     * <code>optional bool is_external_submission = 14 [default = false];</code>
+     */
+    public boolean getIsExternalSubmission() {
+      return isExternalSubmission_;
+    }
+
     private void initFields() {
       user_ = "";
       signatureKeyId_ = 0L;
@@ -4110,6 +4141,7 @@ public final class LlapDaemonProtocolProtos {
       outputSpecs_ = java.util.Collections.emptyList();
       groupedInputSpecs_ = java.util.Collections.emptyList();
       vertexParallelism_ = 0;
+      isExternalSubmission_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4162,6 +4194,9 @@ public final class LlapDaemonProtocolProtos {
       if (((bitField0_ & 0x00000200) == 0x00000200)) {
         output.writeInt32(13, vertexParallelism_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        output.writeBool(14, isExternalSubmission_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4223,6 +4258,10 @@ public final class LlapDaemonProtocolProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(13, vertexParallelism_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(14, isExternalSubmission_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4302,6 +4341,11 @@ public final class LlapDaemonProtocolProtos {
         result = result && (getVertexParallelism()
             == other.getVertexParallelism());
       }
+      result = result && (hasIsExternalSubmission() == other.hasIsExternalSubmission());
+      if (hasIsExternalSubmission()) {
+        result = result && (getIsExternalSubmission()
+            == other.getIsExternalSubmission());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4367,6 +4411,10 @@ public final class LlapDaemonProtocolProtos {
         hash = (37 * hash) + VERTEX_PARALLELISM_FIELD_NUMBER;
         hash = (53 * hash) + getVertexParallelism();
       }
+      if (hasIsExternalSubmission()) {
+        hash = (37 * hash) + IS_EXTERNAL_SUBMISSION_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getIsExternalSubmission());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4531,6 +4579,8 @@ public final class LlapDaemonProtocolProtos {
         }
         vertexParallelism_ = 0;
         bitField0_ = (bitField0_ & ~0x00001000);
+        isExternalSubmission_ = false;
+        bitField0_ = (bitField0_ & ~0x00002000);
         return this;
       }
 
@@ -4634,6 +4684,10 @@ public final class LlapDaemonProtocolProtos {
           to_bitField0_ |= 0x00000200;
         }
         result.vertexParallelism_ = vertexParallelism_;
+        if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+          to_bitField0_ |= 0x00000400;
+        }
+        result.isExternalSubmission_ = isExternalSubmission_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4768,6 +4822,9 @@ public final class LlapDaemonProtocolProtos {
         if (other.hasVertexParallelism()) {
           setVertexParallelism(other.getVertexParallelism());
         }
+        if (other.hasIsExternalSubmission()) {
+          setIsExternalSubmission(other.getIsExternalSubmission());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6282,6 +6339,39 @@ public final class LlapDaemonProtocolProtos {
         return this;
       }
 
+      // optional bool is_external_submission = 14 [default = false];
+      private boolean isExternalSubmission_ ;
+      /**
+       * <code>optional bool is_external_submission = 14 [default = false];</code>
+       */
+      public boolean hasIsExternalSubmission() {
+        return ((bitField0_ & 0x00002000) == 0x00002000);
+      }
+      /**
+       * <code>optional bool is_external_submission = 14 [default = false];</code>
+       */
+      public boolean getIsExternalSubmission() {
+        return isExternalSubmission_;
+      }
+      /**
+       * <code>optional bool is_external_submission = 14 [default = false];</code>
+       */
+      public Builder setIsExternalSubmission(boolean value) {
+        bitField0_ |= 0x00002000;
+        isExternalSubmission_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool is_external_submission = 14 [default = false];</code>
+       */
+      public Builder clearIsExternalSubmission() {
+        bitField0_ = (bitField0_ & ~0x00002000);
+        isExternalSubmission_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:SignableVertexSpec)
     }
 
@@ -17411,83 +17501,85 @@ public final class LlapDaemonProtocolProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" +
-      "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" +
-      "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" +
-      "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" +
-      "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" +
-      "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" +
-      "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" +
-      "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" +
-      "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" +
-      "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d",
-      "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" +
-      "\245\003\n\022SignableVertexSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016" +
-      "signatureKeyId\030\002 \001(\003\022/\n\020query_identifier" +
-      "\030\003 \001(\0132\025.QueryIdentifierProto\022\025\n\rhive_qu" +
-      "ery_id\030\004 \001(\t\022\020\n\010dag_name\030\005 \001(\t\022\023\n\013vertex" +
-      "_name\030\006 \001(\t\022\024\n\014vertex_index\030\007 \001(\005\022\030\n\020tok" +
-      "en_identifier\030\010 \001(\t\0224\n\024processor_descrip" +
-      "tor\030\t \001(\0132\026.EntityDescriptorProto\022!\n\013inp" +
-      "ut_specs\030\n \003(\0132\014.IOSpecProto\022\"\n\014output_s" +
-      "pecs\030\013 \003(\0132\014.IOSpecProto\0221\n\023grouped_inpu",
-      "t_specs\030\014 \003(\0132\024.GroupInputSpecProto\022\032\n\022v" +
-      "ertex_parallelism\030\r \001(\005\"K\n\016VertexOrBinar" +
-      "y\022#\n\006vertex\030\001 \001(\0132\023.SignableVertexSpec\022\024" +
-      "\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023FragmentRuntime" +
-      "Info\022#\n\033num_self_and_upstream_tasks\030\001 \001(" +
-      "\005\022-\n%num_self_and_upstream_completed_tas" +
-      "ks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001(\005\022\026\n" +
-      "\016dag_start_time\030\004 \001(\003\022 \n\030first_attempt_s" +
-      "tart_time\030\005 \001(\003\022\"\n\032current_attempt_start" +
-      "_time\030\006 \001(\003\"d\n\024QueryIdentifierProto\022\035\n\025a",
-      "pplication_id_string\030\001 \001(\t\022\021\n\tdag_index\030" +
-      "\002 \001(\005\022\032\n\022app_attempt_number\030\003 \001(\005\"l\n\013Not" +
-      "TezEvent\022\037\n\027input_event_proto_bytes\030\001 \002(" +
-      "\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017dest_input_nam" +
-      "e\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n\026SubmitWorkRe" +
-      "questProto\022\"\n\twork_spec\030\001 \001(\0132\017.VertexOr" +
-      "Binary\022\033\n\023work_spec_signature\030\002 \001(\014\022\027\n\017f" +
-      "ragment_number\030\003 \001(\005\022\026\n\016attempt_number\030\004" +
-      " \001(\005\022\033\n\023container_id_string\030\005 \001(\t\022\017\n\007am_" +
-      "host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005\022\032\n\022credentia",
-      "ls_binary\030\010 \001(\014\0223\n\025fragment_runtime_info" +
-      "\030\t \001(\0132\024.FragmentRuntimeInfo\022\033\n\023initial_" +
-      "event_bytes\030\n \001(\014\022\037\n\027initial_event_signa" +
-      "ture\030\013 \001(\014\"b\n\027SubmitWorkResponseProto\022/\n" +
-      "\020submission_state\030\001 \001(\0162\025.SubmissionStat" +
-      "eProto\022\026\n\016unique_node_id\030\002 \001(\t\"\205\001\n\036Sourc" +
-      "eStateUpdatedRequestProto\022/\n\020query_ident" +
-      "ifier\030\001 \001(\0132\025.QueryIdentifierProto\022\020\n\010sr" +
-      "c_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStat" +
-      "eProto\"!\n\037SourceStateUpdatedResponseProt",
-      "o\"e\n\031QueryCompleteRequestProto\022/\n\020query_" +
-      "identifier\030\001 \001(\0132\025.QueryIdentifierProto\022" +
-      "\027\n\014delete_delay\030\002 \001(\003:\0010\"\034\n\032QueryComplet" +
-      "eResponseProto\"t\n\035TerminateFragmentReque" +
-      "stProto\022/\n\020query_identifier\030\001 \001(\0132\025.Quer" +
-      "yIdentifierProto\022\"\n\032fragment_identifier_" +
-      "string\030\002 \001(\t\" \n\036TerminateFragmentRespons" +
-      "eProto\"&\n\024GetTokenRequestProto\022\016\n\006app_id" +
-      "\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token" +
-      "\030\001 \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n",
-      "\013fragment_id\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020Sou" +
-      "rceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNN" +
-      "ING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTE" +
-      "D\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n" +
-      "\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Subm" +
-      "itWorkRequestProto\032\030.SubmitWorkResponseP" +
-      "roto\022W\n\022sourceStateUpdated\022\037.SourceState" +
-      "UpdatedRequestProto\032 .SourceStateUpdated" +
-      "ResponseProto\022H\n\rqueryComplete\022\032.QueryCo" +
-      "mpleteRequestProto\032\033.QueryCompleteRespon",
-      "seProto\022T\n\021terminateFragment\022\036.Terminate" +
-      "FragmentRequestProto\032\037.TerminateFragment" +
-      "ResponseProto2]\n\026LlapManagementProtocol\022" +
-      "C\n\022getDelegationToken\022\025.GetTokenRequestP" +
-      "roto\032\026.GetTokenResponseProtoBH\n&org.apac" +
-      "he.hadoop.hive.llap.daemon.rpcB\030LlapDaem" +
-      "onProtocolProtos\210\001\001\240\001\001"
+      "\n1llap-common/src/protobuf/LlapDaemonPro" +
+      "tocol.proto\"9\n\020UserPayloadProto\022\024\n\014user_" +
+      "payload\030\001 \001(\014\022\017\n\007version\030\002 \001(\005\"j\n\025Entity" +
+      "DescriptorProto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014u" +
+      "ser_payload\030\002 \001(\0132\021.UserPayloadProto\022\024\n\014" +
+      "history_text\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025con" +
+      "nected_vertex_name\030\001 \001(\t\022-\n\rio_descripto" +
+      "r\030\002 \001(\0132\026.EntityDescriptorProto\022\033\n\023physi" +
+      "cal_edge_count\030\003 \001(\005\"z\n\023GroupInputSpecPr" +
+      "oto\022\022\n\ngroup_name\030\001 \001(\t\022\026\n\016group_vertice",
+      "s\030\002 \003(\t\0227\n\027merged_input_descriptor\030\003 \001(\013" +
+      "2\026.EntityDescriptorProto\"\314\003\n\022SignableVer" +
+      "texSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016signatureKeyId\030" +
+      "\002 \001(\003\022/\n\020query_identifier\030\003 \001(\0132\025.QueryI" +
+      "dentifierProto\022\025\n\rhive_query_id\030\004 \001(\t\022\020\n" +
+      "\010dag_name\030\005 \001(\t\022\023\n\013vertex_name\030\006 \001(\t\022\024\n\014" +
+      "vertex_index\030\007 \001(\005\022\030\n\020token_identifier\030\010" +
+      " \001(\t\0224\n\024processor_descriptor\030\t \001(\0132\026.Ent" +
+      "ityDescriptorProto\022!\n\013input_specs\030\n \003(\0132" +
+      "\014.IOSpecProto\022\"\n\014output_specs\030\013 \003(\0132\014.IO",
+      "SpecProto\0221\n\023grouped_input_specs\030\014 \003(\0132\024" +
+      ".GroupInputSpecProto\022\032\n\022vertex_paralleli" +
+      "sm\030\r \001(\005\022%\n\026is_external_submission\030\016 \001(\010" +
+      ":\005false\"K\n\016VertexOrBinary\022#\n\006vertex\030\001 \001(" +
+      "\0132\023.SignableVertexSpec\022\024\n\014vertexBinary\030\002" +
+      " \001(\014\"\344\001\n\023FragmentRuntimeInfo\022#\n\033num_self" +
+      "_and_upstream_tasks\030\001 \001(\005\022-\n%num_self_an" +
+      "d_upstream_completed_tasks\030\002 \001(\005\022\033\n\023with" +
+      "in_dag_priority\030\003 \001(\005\022\026\n\016dag_start_time\030" +
+      "\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003\022",
+      "\"\n\032current_attempt_start_time\030\006 \001(\003\"d\n\024Q" +
+      "ueryIdentifierProto\022\035\n\025application_id_st" +
+      "ring\030\001 \001(\t\022\021\n\tdag_index\030\002 \001(\005\022\032\n\022app_att" +
+      "empt_number\030\003 \001(\005\"l\n\013NotTezEvent\022\037\n\027inpu" +
+      "t_event_proto_bytes\030\001 \002(\014\022\023\n\013vertex_name" +
+      "\030\002 \002(\t\022\027\n\017dest_input_name\030\003 \002(\t\022\016\n\006key_i" +
+      "d\030\004 \001(\005\"\330\002\n\026SubmitWorkRequestProto\022\"\n\two" +
+      "rk_spec\030\001 \001(\0132\017.VertexOrBinary\022\033\n\023work_s" +
+      "pec_signature\030\002 \001(\014\022\027\n\017fragment_number\030\003" +
+      " \001(\005\022\026\n\016attempt_number\030\004 \001(\005\022\033\n\023containe",
+      "r_id_string\030\005 \001(\t\022\017\n\007am_host\030\006 \001(\t\022\017\n\007am" +
+      "_port\030\007 \001(\005\022\032\n\022credentials_binary\030\010 \001(\014\022" +
+      "3\n\025fragment_runtime_info\030\t \001(\0132\024.Fragmen" +
+      "tRuntimeInfo\022\033\n\023initial_event_bytes\030\n \001(" +
+      "\014\022\037\n\027initial_event_signature\030\013 \001(\014\"b\n\027Su" +
+      "bmitWorkResponseProto\022/\n\020submission_stat" +
+      "e\030\001 \001(\0162\025.SubmissionStateProto\022\026\n\016unique" +
+      "_node_id\030\002 \001(\t\"\205\001\n\036SourceStateUpdatedReq" +
+      "uestProto\022/\n\020query_identifier\030\001 \001(\0132\025.Qu" +
+      "eryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005",
+      "state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Source" +
+      "StateUpdatedResponseProto\"e\n\031QueryComple" +
+      "teRequestProto\022/\n\020query_identifier\030\001 \001(\013" +
+      "2\025.QueryIdentifierProto\022\027\n\014delete_delay\030" +
+      "\002 \001(\003:\0010\"\034\n\032QueryCompleteResponseProto\"t" +
+      "\n\035TerminateFragmentRequestProto\022/\n\020query" +
+      "_identifier\030\001 \001(\0132\025.QueryIdentifierProto" +
+      "\022\"\n\032fragment_identifier_string\030\002 \001(\t\" \n\036" +
+      "TerminateFragmentResponseProto\"&\n\024GetTok" +
+      "enRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025GetTok",
+      "enResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033LlapOu" +
+      "tputSocketInitMessage\022\023\n\013fragment_id\030\001 \002" +
+      "(\t\022\r\n\005token\030\002 \001(\014*2\n\020SourceStateProto\022\017\n" +
+      "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Submiss" +
+      "ionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020" +
+      "\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonProto" +
+      "col\022?\n\nsubmitWork\022\027.SubmitWorkRequestPro" +
+      "to\032\030.SubmitWorkResponseProto\022W\n\022sourceSt" +
+      "ateUpdated\022\037.SourceStateUpdatedRequestPr" +
+      "oto\032 .SourceStateUpdatedResponseProto\022H\n",
+      "\rqueryComplete\022\032.QueryCompleteRequestPro" +
+      "to\032\033.QueryCompleteResponseProto\022T\n\021termi" +
+      "nateFragment\022\036.TerminateFragmentRequestP" +
+      "roto\032\037.TerminateFragmentResponseProto2]\n" +
+      "\026LlapManagementProtocol\022C\n\022getDelegation" +
+      "Token\022\025.GetTokenRequestProto\032\026.GetTokenR" +
+      "esponseProtoBH\n&org.apache.hadoop.hive.l" +
+      "lap.daemon.rpcB\030LlapDaemonProtocolProtos" +
+      "\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17523,7 +17615,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_SignableVertexSpec_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_SignableVertexSpec_descriptor,
-              new java.lang.String[] { "User", "SignatureKeyId", "QueryIdentifier", "HiveQueryId", "DagName", "VertexName", "VertexIndex", "TokenIdentifier", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", });
+              new java.lang.String[] { "User", "SignatureKeyId", "QueryIdentifier", "HiveQueryId", "DagName", "VertexName", "VertexIndex", "TokenIdentifier", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "IsExternalSubmission", });
           internal_static_VertexOrBinary_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_VertexOrBinary_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 3a3a2b8..e0c0070 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -67,6 +67,7 @@ message SignableVertexSpec
   repeated GroupInputSpecProto grouped_input_specs = 12;
 
   optional int32 vertex_parallelism = 13; // An internal field required for Tez.
+  optional bool is_external_submission = 14 [default = false];
 }
 
 // Union

http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index ce2f457..a6d9d54 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -60,6 +60,7 @@ public class QueryInfo {
   private final LlapNodeId amNodeId;
   private final String appTokenIdentifier;
   private final Token<JobTokenIdentifier> appToken;
+  private final boolean isExternalQuery;
   // Map of states for different vertices.
 
   private final Set<QueryFragmentInfo> knownFragments =
@@ -77,7 +78,8 @@ public class QueryInfo {
     String[] localDirsBase, FileSystem localFs, String tokenUserName,
     String tokenAppId, final LlapNodeId amNodeId,
     String tokenIdentifier,
-    Token<JobTokenIdentifier> appToken) {
+    Token<JobTokenIdentifier> appToken,
+    boolean isExternalQuery) {
     this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagIdString = dagIdString;
@@ -93,6 +95,7 @@ public class QueryInfo {
     this.amNodeId = amNodeId;
     this.appTokenIdentifier = tokenIdentifier;
     this.appToken = appToken;
+    this.isExternalQuery = isExternalQuery;
     final InetSocketAddress address =
         NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort());
     SecurityUtil.setTokenService(appToken, address);
@@ -146,6 +149,10 @@ public class QueryInfo {
     return Lists.newArrayList(knownFragments);
   }
 
+  public boolean isExternalQuery() {
+    return isExternalQuery;
+  }
+
   private synchronized void createLocalDirs() throws IOException {
     if (localDirs == null) {
       localDirs = new String[localDirsBase.length];

http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index daeb555..5c42b27 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -89,7 +89,7 @@ public class QueryTracker extends AbstractService {
 
 
   private final Lock lock = new ReentrantLock();
-  private final ConcurrentMap<QueryIdentifier, ReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
+  private final ConcurrentMap<QueryIdentifier, ReentrantReadWriteLock> dagSpecificLocks = new ConcurrentHashMap<>();
 
   // Tracks various maps for dagCompletions. This is setup here since stateChange messages
   // may be processed by a thread which ends up executing before a task.
@@ -119,7 +119,7 @@ public class QueryTracker extends AbstractService {
     int numCleanerThreads = HiveConf.getIntVar(
         conf, ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS);
     this.executorService = Executors.newScheduledThreadPool(numCleanerThreads,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner %d").build());
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryCompletionThread %d").build());
 
     String logger = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_LOGGER);
     if (logger != null && (logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING))) {
@@ -169,7 +169,8 @@ public class QueryTracker extends AbstractService {
             new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
                 dagIdentifier, user,
                 getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
-                tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken);
+                tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken,
+                vertex.getIsExternalSubmission());
         QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         if (old != null) {
           queryInfo = old;
@@ -188,9 +189,11 @@ public class QueryTracker extends AbstractService {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier);
       }
-      ShuffleHandler.get()
-          .registerDag(appIdString, dagIdentifier, appToken,
-              user, queryInfo.getLocalDirs());
+      if (!vertex.getIsExternalSubmission()) {
+        ShuffleHandler.get()
+            .registerDag(appIdString, dagIdentifier, appToken,
+                user, queryInfo.getLocalDirs());
+      }
 
       return queryInfo.registerFragment(
           vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString);
@@ -212,6 +215,9 @@ public class QueryTracker extends AbstractService {
       LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId);
     } else {
       queryInfo.unregisterFragment(fragmentInfo);
+
+      // Try marking the query as complete if this is an external submission
+      handleFragmentCompleteExternalQuery(queryInfo);
     }
   }
 
@@ -237,46 +243,45 @@ public class QueryTracker extends AbstractService {
    * @param deleteDelay
    */
   QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
-      boolean isInternal) throws IOException {
+      boolean isExternalQuery) throws IOException {
     if (deleteDelay == -1) {
       deleteDelay = defaultDeleteDelaySeconds;
     }
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     dagLock.writeLock().lock();
     try {
-      QueryInfo queryInfo = isInternal
+      // If isExternalQuery -> the call is from within hte daemon, so no permission check required
+      // to get access to the queryInfo instance.
+      QueryInfo queryInfo = isExternalQuery
           ? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier);
-      rememberCompletedDag(queryIdentifier);
-      LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier,
-          deleteDelay);
-      queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
         // Should not happen.
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
         return null;
       }
-      String[] localDirs = queryInfo.getLocalDirsNoCreate();
-      if (localDirs != null) {
-        for (String localDir : localDirs) {
-          cleanupDir(localDir, deleteDelay);
-          ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
-        }
-      }
 
-      if (routeBasedLoggingEnabled) {
-        // Inform the routing purgePolicy.
-        // Send out a fake log message at the ERROR level with the MDC for this query setup. With an
-        // LLAP custom appender this message will not be logged.
-        final String dagId = queryInfo.getDagIdString();
-        final String queryId = queryInfo.getHiveQueryIdString();
-        MDC.put("dagId", dagId);
-        MDC.put("queryId", queryId);
-        try {
-          LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." +
-              " Query complete: " + queryInfo.getHiveQueryIdString() + ", " +
-              queryInfo.getDagIdString());
-        } finally {
-          MDC.clear();
+      LOG.info(
+          "Processing queryComplete for queryIdentifier={}, isExternalQuery={}, with deleteDelay={} seconds",
+          queryIdentifier, isExternalQuery,
+          deleteDelay);
+
+      queryInfoMap.remove(queryIdentifier);
+      if (!isExternalQuery) {
+        rememberCompletedDag(queryIdentifier);
+        cleanupLocalDirs(queryInfo, deleteDelay);
+        handleLogOnQueryCompletion(queryInfo.getHiveQueryIdString(), queryInfo.getDagIdString());
+      } else {
+        // If there's no pending fragments, queue some of the cleanup for a later point - locks, log rolling.
+        if (queryInfo.getRegisteredFragments().size() == 0) {
+          LOG.debug("Queueing future cleanup for external queryId: {}", queryInfo.getHiveQueryIdString());
+          executorService.schedule(new ExternalQueryCleanerCallable(queryInfo.getHiveQueryIdString(),
+                  queryInfo.getDagIdString(), queryInfo.getQueryIdentifier()), 1, TimeUnit.MINUTES);
+        } else {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(
+                "NumRegisterFragments={}, Not queuing cleanup for external queryId={}",
+                queryInfo.getRegisteredFragments().size(), queryInfo.getHiveQueryIdString());
+          }
         }
       }
 
@@ -286,7 +291,9 @@ public class QueryTracker extends AbstractService {
       // should not be allowed after a query complete is received.
       sourceCompletionMap.remove(queryIdentifier);
       String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier);
-      dagSpecificLocks.remove(queryIdentifier);
+      if (!isExternalQuery) {
+        removeQuerySpecificLock(queryIdentifier);
+      }
       if (savedQueryId != null) {
         ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
       }
@@ -297,6 +304,37 @@ public class QueryTracker extends AbstractService {
   }
 
 
+  private void cleanupLocalDirs(QueryInfo queryInfo, long deleteDelay) {
+    String[] localDirs = queryInfo.getLocalDirsNoCreate();
+    if (localDirs != null) {
+      for (String localDir : localDirs) {
+        cleanupDir(localDir, deleteDelay);
+        ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier());
+      }
+    }
+  }
+
+  private void handleLogOnQueryCompletion(String queryIdString, String dagIdString) {
+    if (routeBasedLoggingEnabled) {
+      // Inform the routing purgePolicy.
+      // Send out a fake log message at the ERROR level with the MDC for this query setup. With an
+      // LLAP custom appender this message will not be logged.
+      MDC.put("dagId", dagIdString);
+      MDC.put("queryId", queryIdString);
+      try {
+        LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." +
+            " Query complete: " + queryIdString + ", " +
+            dagIdString);
+      } finally {
+        MDC.clear();
+      }
+    }
+  }
+
+  private void removeQuerySpecificLock(QueryIdentifier queryIdentifier) {
+    dagSpecificLocks.remove(queryIdentifier);
+  }
+
 
   public void rememberCompletedDag(QueryIdentifier queryIdentifier) {
     if (completedDagMap.add(queryIdentifier)) {
@@ -325,11 +363,14 @@ public class QueryTracker extends AbstractService {
     }
   }
 
+  private ReentrantReadWriteLock getDagLockNoCreate(QueryIdentifier queryIdentifier) {
+    return dagSpecificLocks.get(queryIdentifier);
+  }
 
-  private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
+  private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) {
     lock.lock();
     try {
-      ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
+      ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier);
       if (dagLock == null) {
         dagLock = new ReentrantReadWriteLock();
         dagSpecificLocks.put(queryIdentifier, dagLock);
@@ -403,6 +444,58 @@ public class QueryTracker extends AbstractService {
     }
   }
 
+  private class ExternalQueryCleanerCallable extends CallableWithNdc<Void> {
+
+    private final String queryIdString;
+    private final String dagIdString;
+    private final QueryIdentifier queryIdentifier;
+
+    public ExternalQueryCleanerCallable(String queryIdString, String dagIdString,
+                                        QueryIdentifier queryIdentifier) {
+      this.queryIdString = queryIdString;
+      this.dagIdString = dagIdString;
+      this.queryIdentifier = queryIdentifier;
+    }
+
+    @Override
+    protected Void callInternal() {
+      LOG.info("External cleanup callable for {}", queryIdentifier);
+      ReentrantReadWriteLock dagLock = getDagLockNoCreate(queryIdentifier);
+      if (dagLock == null) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("null dagLock. No cleanup required at the moment for {}", queryIdString);
+        }
+        return null;
+      }
+      boolean locked = dagLock.writeLock().tryLock();
+      if (!locked) {
+        // Something else holds the lock at the moment. Don't bother cleaning up.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Lock not obtained. Skipping cleanup for {}", queryIdString);
+        }
+        return null;
+      }
+      try {
+        // See if there are additional knownFragments. If there are, more fragments came in
+        // after this cleanup was scheduled, and there's nothing to be done.
+        QueryInfo queryInfo = queryInfoMap.get(queryIdentifier);
+        if (queryInfo != null) {
+          // QueryInfo will only exist if more work came in, after this was scheduled.
+          if (LOG.isTraceEnabled()) {
+            LOG.info("QueryInfo found for {}. Expecting future cleanup", queryIdString);
+          }
+          return null;
+        }
+        LOG.info("Processing cleanup for {}", queryIdString);
+        handleLogOnQueryCompletion(queryIdString, dagIdString);
+        removeQuerySpecificLock(queryIdentifier);
+      } finally {
+        dagLock.writeLock().unlock();
+      }
+      return null;
+    }
+  }
+
   private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException {
     QueryInfo queryInfo = queryInfoMap.get(queryId);
     if (queryInfo == null) return null;
@@ -414,4 +507,37 @@ public class QueryTracker extends AbstractService {
   public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException {
     return checkPermissionsAndGetQuery(queryId) != null;
   }
+
+
+  private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) {
+    if (queryInfo.isExternalQuery()) {
+      ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier());
+      if (dagLock == null) {
+        LOG.warn("Ignoring fragment completion for unknown query: {}",
+            queryInfo.getQueryIdentifier());
+      }
+      boolean locked = dagLock.writeLock().tryLock();
+      if (!locked) {
+        // Some other operation in progress using the same lock.
+        // A subsequent fragmentComplete is expected to come in.
+        return;
+      }
+      try {
+         if (queryInfo.getRegisteredFragments().size() == 0) {
+           queryComplete(queryInfo.getQueryIdentifier(), -1, true);
+         } else {
+           if (LOG.isTraceEnabled()) {
+             LOG.trace(
+                 "Not invoking queryComplete on fragmentComplete for {}, since there are known fragments. count={}",
+                 queryInfo.getHiveQueryIdString(), queryInfo.getRegisteredFragments().size());
+           }
+         }
+      } catch (IOException e) {
+        LOG.error("Failed to process query complete for external submission: {}",
+            queryInfo.getQueryIdentifier());
+      } finally {
+        dagLock.writeLock().unlock();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 27c426c..e3edf79 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -96,7 +96,7 @@ public class TaskExecutorTestHelpers {
         new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
             "fakeHiveQueryId", 1, "fakeUser",
             new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
-            new String[0], null, "fakeUser", null, nodeId, null, null);
+            new String[0], null, "fakeUser", null, nodeId, null, null, false);
     return queryInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1dfe101a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 868eec7..d4ec44e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -299,6 +299,17 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     FileSystem fs = scratchDir.getFileSystem(job);
     try {
       LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+
+      LlapCoordinator coordinator = LlapCoordinator.getInstance();
+      if (coordinator == null) {
+        throw new IOException("LLAP coordinator is not initialized; must be running in HS2 with "
+            + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
+      }
+
+      // Update the queryId to use the generated applicationId. See comment below about
+      // why this is done.
+      ApplicationId applicationId = coordinator.createExtClientAppId();
+      HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString());
       Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
           new ArrayList<LocalResource>(), fs, ctx, false, work,
           work.getVertexType(mapWork));
@@ -312,6 +323,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
               ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
               ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+
+
       HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork);
       List<Event> eventList = splitGenerator.initialize();
 
@@ -328,15 +341,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         LOG.debug("NumEvents=" + eventList.size() + ", NumSplits=" + result.length);
       }
 
-      LlapCoordinator coordinator = LlapCoordinator.getInstance();
-      if (coordinator == null) {
-        throw new IOException("LLAP coordinator is not initialized; must be running in HS2 with "
-            + ConfVars.LLAP_HS2_ENABLE_COORDINATOR.varname + " enabled");
-      }
-
-      // See the discussion in the implementation as to why we generate app ID.
-      ApplicationId applicationId = coordinator.createExtClientAppId();
-
       // This assumes LLAP cluster owner is always the HS2 user.
       String llapUser = UserGroupInformation.getLoginUser().getShortUserName();
 
@@ -440,6 +444,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
             .setDagIndex(taskSpec.getDagIdentifier()).setAppAttemptNumber(0).build();
     final SignableVertexSpec.Builder svsb = Converters.constructSignableVertexSpec(
         taskSpec, queryIdentifierProto, applicationId.toString(), queryUser, queryIdString);
+    svsb.setIsExternalSubmission(true);
     if (signer == null) {
       SignedMessage result = new SignedMessage();
       result.message = serializeVertexSpec(svsb);