You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:52 UTC

[02/39] hive git commit: HIVE-13138. Add client to communicate with interface, initial split setup. (Siddharth Seth and Vikram Dixit K)

HIVE-13138. Add client to communicate with interface, initial split
setup. (Siddharth Seth and Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f272acea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f272acea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f272acea

Branch: refs/heads/master
Commit: f272aceaf7da77f9d87f5be42bb1520181035c2c
Parents: bf83407
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Feb 23 23:55:46 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Feb 23 23:55:46 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../org/apache/hive/jdbc/LlapInputFormat.java   |  10 +
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 159 ++++++++++---
 .../src/protobuf/LlapDaemonProtocol.proto       |   8 +
 .../hive/llap/daemon/impl/LlapDaemon.java       |   3 +
 .../llap/daemon/impl/TaskRunnerCallable.java    |   5 +-
 .../ext/LlapTaskUmbilicalExternalClient.java    | 197 ++++++++++++++++
 .../helpers/LlapTaskUmbilicalServer.java        |  57 +++++
 .../hadoop/hive/llap/LlapInputFormat.java       | 146 +++---------
 .../apache/hadoop/hive/llap/LlapInputSplit.java |  80 ++++---
 .../apache/hadoop/hive/llap/SubmitWorkInfo.java |  65 ++++++
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |  49 +++-
 .../hive/ql/exec/tez/MapRecordProcessor.java    |   2 +
 .../hive/ql/parse/TypeCheckProcFactory.java     |   3 +
 .../ql/udf/generic/GenericUDFGetSplits.java     | 224 +++++++++++++++++--
 .../org/apache/tez/dag/api/TaskSpecBuilder.java |  45 ++++
 16 files changed, 837 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7fbcbba..6a22890 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2687,6 +2687,9 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
         "LLAP daemon output service port"),
 
+    LLAP_TMP_SUBMITWORK_USING_TEZ_AM("hive.llap.tmp.submit.work.using.tez.am", true,""),
+    LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS("hive.llap.tmp.ext.client.num.server.handlers", 1, ""),
+
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),
       "Timeout for requests from Hive client to remote Spark driver."),

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
index 97fe2c5..c38dd82 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -59,6 +59,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
   private String pwd;  // ""
   private String query;
 
+  public final String URL_KEY = "llap.if.hs2.connection";
+  public final String QUERY_KEY = "llap.if.query";
+  public final String USER_KEY = "llap.if.user";
+  public final String PWD_KEY = "llap.if.pwd";
+
   private Connection con;
   private Statement stmt;
 
@@ -133,6 +138,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     List<InputSplit> ins = new ArrayList<InputSplit>();
 
+    if (url == null) url = job.get(URL_KEY);
+    if (query == null) query = job.get(QUERY_KEY);
+    if (user == null) user = job.get(USER_KEY);
+    if (pwd == null) pwd = job.get(PWD_KEY);
+
     if (url == null || query == null) {
       throw new IllegalStateException();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 4ab7b32..653e7e0 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -7334,6 +7334,16 @@ public final class LlapDaemonProtocolProtos {
      * <code>optional .FragmentRuntimeInfo fragment_runtime_info = 10;</code>
      */
     org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfoOrBuilder getFragmentRuntimeInfoOrBuilder();
+
+    // optional bool usingTezAm = 11 [default = true];
+    /**
+     * <code>optional bool usingTezAm = 11 [default = true];</code>
+     */
+    boolean hasUsingTezAm();
+    /**
+     * <code>optional bool usingTezAm = 11 [default = true];</code>
+     */
+    boolean getUsingTezAm();
   }
   /**
    * Protobuf type {@code SubmitWorkRequestProto}
@@ -7452,6 +7462,11 @@ public final class LlapDaemonProtocolProtos {
               bitField0_ |= 0x00000200;
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              usingTezAm_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -7799,6 +7814,22 @@ public final class LlapDaemonProtocolProtos {
       return fragmentRuntimeInfo_;
     }
 
+    // optional bool usingTezAm = 11 [default = true];
+    public static final int USINGTEZAM_FIELD_NUMBER = 11;
+    private boolean usingTezAm_;
+    /**
+     * <code>optional bool usingTezAm = 11 [default = true];</code>
+     */
+    public boolean hasUsingTezAm() {
+      return ((bitField0_ & 0x00000400) == 0x00000400);
+    }
+    /**
+     * <code>optional bool usingTezAm = 11 [default = true];</code>
+     */
+    public boolean getUsingTezAm() {
+      return usingTezAm_;
+    }
+
     private void initFields() {
       containerIdString_ = "";
       amHost_ = "";
@@ -7810,6 +7841,7 @@ public final class LlapDaemonProtocolProtos {
       appAttemptNumber_ = 0;
       fragmentSpec_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance();
       fragmentRuntimeInfo_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo.getDefaultInstance();
+      usingTezAm_ = true;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7853,6 +7885,9 @@ public final class LlapDaemonProtocolProtos {
       if (((bitField0_ & 0x00000200) == 0x00000200)) {
         output.writeMessage(10, fragmentRuntimeInfo_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        output.writeBool(11, usingTezAm_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -7902,6 +7937,10 @@ public final class LlapDaemonProtocolProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(10, fragmentRuntimeInfo_);
       }
+      if (((bitField0_ & 0x00000400) == 0x00000400)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(11, usingTezAm_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -7975,6 +8014,11 @@ public final class LlapDaemonProtocolProtos {
         result = result && getFragmentRuntimeInfo()
             .equals(other.getFragmentRuntimeInfo());
       }
+      result = result && (hasUsingTezAm() == other.hasUsingTezAm());
+      if (hasUsingTezAm()) {
+        result = result && (getUsingTezAm()
+            == other.getUsingTezAm());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -8028,6 +8072,10 @@ public final class LlapDaemonProtocolProtos {
         hash = (37 * hash) + FRAGMENT_RUNTIME_INFO_FIELD_NUMBER;
         hash = (53 * hash) + getFragmentRuntimeInfo().hashCode();
       }
+      if (hasUsingTezAm()) {
+        hash = (37 * hash) + USINGTEZAM_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getUsingTezAm());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -8167,6 +8215,8 @@ public final class LlapDaemonProtocolProtos {
           fragmentRuntimeInfoBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000200);
+        usingTezAm_ = true;
+        bitField0_ = (bitField0_ & ~0x00000400);
         return this;
       }
 
@@ -8243,6 +8293,10 @@ public final class LlapDaemonProtocolProtos {
         } else {
           result.fragmentRuntimeInfo_ = fragmentRuntimeInfoBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000400;
+        }
+        result.usingTezAm_ = usingTezAm_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -8299,6 +8353,9 @@ public final class LlapDaemonProtocolProtos {
         if (other.hasFragmentRuntimeInfo()) {
           mergeFragmentRuntimeInfo(other.getFragmentRuntimeInfo());
         }
+        if (other.hasUsingTezAm()) {
+          setUsingTezAm(other.getUsingTezAm());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -9032,6 +9089,39 @@ public final class LlapDaemonProtocolProtos {
         return fragmentRuntimeInfoBuilder_;
       }
 
+      // optional bool usingTezAm = 11 [default = true];
+      private boolean usingTezAm_ = true;
+      /**
+       * <code>optional bool usingTezAm = 11 [default = true];</code>
+       */
+      public boolean hasUsingTezAm() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      /**
+       * <code>optional bool usingTezAm = 11 [default = true];</code>
+       */
+      public boolean getUsingTezAm() {
+        return usingTezAm_;
+      }
+      /**
+       * <code>optional bool usingTezAm = 11 [default = true];</code>
+       */
+      public Builder setUsingTezAm(boolean value) {
+        bitField0_ |= 0x00000400;
+        usingTezAm_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool usingTezAm = 11 [default = true];</code>
+       */
+      public Builder clearUsingTezAm() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        usingTezAm_ = true;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:SubmitWorkRequestProto)
     }
 
@@ -14392,7 +14482,7 @@ public final class LlapDaemonProtocolProtos {
       "\030\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003" +
       "\022\"\n\032current_attempt_start_time\030\006 \001(\003\"F\n\024" +
       "QueryIdentifierProto\022\026\n\016app_identifier\030\001" +
-      " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\266\002\n\026SubmitW" +
+      " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\320\002\n\026SubmitW" +
       "orkRequestProto\022\033\n\023container_id_string\030\001" +
       " \001(\t\022\017\n\007am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030",
       "\n\020token_identifier\030\004 \001(\t\022\032\n\022credentials_" +
@@ -14400,38 +14490,39 @@ public final class LlapDaemonProtocolProtos {
       "n_id_string\030\007 \001(\t\022\032\n\022app_attempt_number\030" +
       "\010 \001(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentS" +
       "pecProto\0223\n\025fragment_runtime_info\030\n \001(\0132" +
-      "\024.FragmentRuntimeInfo\"J\n\027SubmitWorkRespo" +
-      "nseProto\022/\n\020submission_state\030\001 \001(\0162\025.Sub" +
-      "missionStateProto\"\205\001\n\036SourceStateUpdated" +
-      "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" +
-      ".QueryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022",
-      " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" +
-      "rceStateUpdatedResponseProto\"w\n\031QueryCom" +
-      "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022/\n\020q" +
-      "uery_identifier\030\002 \001(\0132\025.QueryIdentifierP" +
-      "roto\022\027\n\014delete_delay\030\004 \001(\003:\0010\"\034\n\032QueryCo" +
-      "mpleteResponseProto\"t\n\035TerminateFragment" +
-      "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" +
-      ".QueryIdentifierProto\022\"\n\032fragment_identi" +
-      "fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" +
-      "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G",
-      "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" +
-      "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" +
-      "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" +
-      "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" +
-      "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" +
-      "bmitWorkRequestProto\032\030.SubmitWorkRespons" +
-      "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" +
-      "teUpdatedRequestProto\032 .SourceStateUpdat" +
-      "edResponseProto\022H\n\rqueryComplete\022\032.Query" +
-      "CompleteRequestProto\032\033.QueryCompleteResp",
-      "onseProto\022T\n\021terminateFragment\022\036.Termina" +
-      "teFragmentRequestProto\032\037.TerminateFragme" +
-      "ntResponseProto2]\n\026LlapManagementProtoco" +
-      "l\022C\n\022getDelegationToken\022\025.GetTokenReques" +
-      "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" +
-      "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" +
-      "emonProtocolProtos\210\001\001\240\001\001"
+      "\024.FragmentRuntimeInfo\022\030\n\nusingTezAm\030\013 \001(" +
+      "\010:\004true\"J\n\027SubmitWorkResponseProto\022/\n\020su" +
+      "bmission_state\030\001 \001(\0162\025.SubmissionStatePr" +
+      "oto\"\205\001\n\036SourceStateUpdatedRequestProto\022/" +
+      "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi",
+      "erProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\016" +
+      "2\021.SourceStateProto\"!\n\037SourceStateUpdate" +
+      "dResponseProto\"w\n\031QueryCompleteRequestPr" +
+      "oto\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifie" +
+      "r\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete" +
+      "_delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponse" +
+      "Proto\"t\n\035TerminateFragmentRequestProto\022/" +
+      "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi" +
+      "erProto\022\"\n\032fragment_identifier_string\030\002 " +
+      "\001(\t\" \n\036TerminateFragmentResponseProto\"\026\n",
+      "\024GetTokenRequestProto\"&\n\025GetTokenRespons" +
+      "eProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProt" +
+      "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Sub" +
+      "missionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJEC" +
+      "TED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonP" +
+      "rotocol\022?\n\nsubmitWork\022\027.SubmitWorkReques" +
+      "tProto\032\030.SubmitWorkResponseProto\022W\n\022sour" +
+      "ceStateUpdated\022\037.SourceStateUpdatedReque" +
+      "stProto\032 .SourceStateUpdatedResponseProt" +
+      "o\022H\n\rqueryComplete\022\032.QueryCompleteReques",
+      "tProto\032\033.QueryCompleteResponseProto\022T\n\021t" +
+      "erminateFragment\022\036.TerminateFragmentRequ" +
+      "estProto\032\037.TerminateFragmentResponseProt" +
+      "o2]\n\026LlapManagementProtocol\022C\n\022getDelega" +
+      "tionToken\022\025.GetTokenRequestProto\032\026.GetTo" +
+      "kenResponseProtoBH\n&org.apache.hadoop.hi" +
+      "ve.llap.daemon.rpcB\030LlapDaemonProtocolPr" +
+      "otos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14485,7 +14576,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_SubmitWorkRequestProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_SubmitWorkRequestProto_descriptor,
-              new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", });
+              new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", "UsingTezAm", });
           internal_static_SubmitWorkResponseProto_descriptor =
             getDescriptor().getMessageTypes().get(8);
           internal_static_SubmitWorkResponseProto_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 944c96c..e964c5f 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -91,6 +91,7 @@ message SubmitWorkRequestProto {
   optional int32 app_attempt_number = 8;
   optional FragmentSpecProto fragment_spec = 9;
   optional FragmentRuntimeInfo fragment_runtime_info = 10;
+  optional bool usingTezAm = 11 [default = true];
 }
 
 enum SubmissionStateProto {
@@ -136,11 +137,18 @@ message GetTokenResponseProto {
   optional bytes token = 1;
 }
 
+message SendEventsRequestProto {
+}
+
+message SendEventsResponseProto {
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
   rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto);
   rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto);
+  rpc sendEvents(SendEventsRequestProto) return (SendEventsResponseProto);
 }
 
 service LlapManagementProtocol {

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 8621826..40a89cb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -14,6 +14,7 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import org.apache.hadoop.hive.llap.LlapOutputFormatService;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryPoolMXBean;
@@ -279,6 +280,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort());
     this.shufflePort.set(ShuffleHandler.get().getPort());
     super.serviceStart();
+    LlapOutputFormatService.get();
     LOG.info("LlapDaemon serviceStart complete");
   }
 
@@ -286,6 +288,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     super.serviceStop();
     ShuffleHandler.shutdown();
     shutdown();
+    LlapOutputFormatService.get().stop();
     LOG.info("LlapDaemon shutdown complete");
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d88d82a..d9d216d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -103,6 +103,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final String queryId;
   private final HadoopShim tezHadoopShim;
   private boolean shouldRunTask = true;
+  private final boolean withTezAm;
   final Stopwatch runtimeWatch = new Stopwatch();
   final Stopwatch killtimerWatch = new Stopwatch();
   private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -131,11 +132,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.jobToken = TokenCache.getSessionToken(credentials);
     this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
     this.amReporter = amReporter;
+    this.withTezAm = request.getUsingTezAm();
+    LOG.warn("ZZZ: DBG: usingTezAm=" + withTezAm);
     // Register with the AMReporter when the callable is setup. Unregister once it starts running.
-    if (jobToken != null) {
     this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
         request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
-    }
     this.metrics = metrics;
     this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
     // TODO Change this to the queryId/Name when that's available.

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
new file mode 100644
index 0000000..ecc032d
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hive.llap.ext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskUmbilicalExternalClient extends AbstractService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
+
+  private final LlapProtocolClientProxy communicator;
+  private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
+  private final Configuration conf;
+  private final LlapTaskUmbilicalProtocol umbilical;
+
+  protected final String tokenIdentifier;
+  protected final Token<JobTokenIdentifier> sessionToken;
+
+
+  private final ConcurrentMap<String, List<TezEvent>> pendingEvents = new ConcurrentHashMap<>();
+
+
+  // TODO KKK Work out the details of the tokenIdentifier, and the session token.
+  // It may just be possible to create one here - since Shuffle is not involved, and this is only used
+  // for communication from LLAP-Daemons to the server. It will need to be sent in as part
+  // of the job submission request.
+  public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken) {
+    super(LlapTaskUmbilicalExternalClient.class.getName());
+    this.conf = conf;
+    this.umbilical = new LlapTaskUmbilicalExternalImpl();
+    this.tokenIdentifier = tokenIdentifier;
+    this.sessionToken = sessionToken;
+    // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
+    this.communicator = new LlapProtocolClientProxy(1, conf, null);
+  }
+
+  @Override
+  public void serviceStart() throws IOException {
+    int numHandlers = HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
+    llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
+  }
+
+  @Override
+  public void serviceStop() {
+    llapTaskUmbilicalServer.shutdownServer();
+    if (this.communicator != null) {
+      this.communicator.stop();
+    }
+  }
+
+
+  /**
+   * Submit the work for actual execution. This should always have the usingTezAm flag disabled
+   * @param submitWorkRequestProto
+   */
+  public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort) {
+    Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
+
+    // Store the actual event first. To be returned on the first heartbeat.
+    Event mrInputEvent = null;
+    // Construct a TezEvent out of this, to send it out on the next heaertbeat
+
+//    submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()
+
+
+    // Send out the actual SubmitWorkRequest
+    communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
+        new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
+          @Override
+          public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
+            if (response.hasSubmissionState()) {
+              if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
+                LOG.info("Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.");
+                return;
+              }
+            }
+            LOG.info("DBG: Submitted " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            LOG.error("Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), t);
+          }
+        });
+
+
+
+
+//    // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
+//    // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
+//    QueryIdentifierProto queryIdentifier = QueryIdentifierProto
+//        .newBuilder()
+//        .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
+//        .build();
+//    LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
+//        LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
+//            LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
+//            setSrcName(TODO)
+//    communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
+
+
+  }
+
+
+
+
+
+
+
+  // TODO Ideally, the server should be shared across all client sessions running on the same node.
+  private class LlapTaskUmbilicalExternalImpl implements  LlapTaskUmbilicalProtocol {
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      // Expecting only a single instance of a task to be running.
+      return true;
+    }
+
+    @Override
+    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+        TezException {
+      // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
+      // Some parts of fault tolerance go here.
+
+      // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
+
+      // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
+
+
+      TezHeartbeatResponse response = new TezHeartbeatResponse();
+      // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
+      TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+      LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString());
+
+      List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
+
+      response.setLastRequestId(request.getRequestId());
+      // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
+      // Also since we have all the MRInput events here - they'll all be sent in together.
+      response.setNextFromEventId(0); // Irrelevant. See comment above.
+      response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
+      response.setEvents(tezEvents);
+
+      // TODO KKK: Should ideally handle things like Task success notifications.
+      // Can this somehow be hooked into the LlapTaskCommunicator to make testing easy
+
+      return response;
+    }
+
+    @Override
+    public void nodeHeartbeat(Text hostname, int port) throws IOException {
+      // TODO Eventually implement - to handle keep-alive messages from pending work.
+    }
+
+    @Override
+    public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
+      // TODO Eventually implement - to handle preemptions within LLAP daemons.
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+                                                  int clientMethodsHash) throws IOException {
+      return ProtocolSignature.getProtocolSignature(this, protocol,
+          clientVersion, clientMethodsHash);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
new file mode 100644
index 0000000..dbd591a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskUmbilicalServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class);
+
+  protected volatile Server server;
+  private final InetSocketAddress address;
+  private final AtomicBoolean started = new AtomicBoolean(true);
+
+  public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
+      IOException {
+    JobTokenSecretManager jobTokenSecretManager =
+        new JobTokenSecretManager();
+    jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+
+    server = new RPC.Builder(conf)
+        .setProtocol(LlapTaskUmbilicalProtocol.class)
+        .setBindAddress("0.0.0.0")
+        .setPort(0)
+        .setInstance(umbilical)
+        .setNumHandlers(numHandlers)
+        .setSecretManager(jobTokenSecretManager).build();
+
+    server.start();
+    this.address = NetUtils.getConnectAddress(server);
+    LOG.info(
+        "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
+            " with numHandlers=" + numHandlers);
+  }
+
+  public InetSocketAddress getAddress() {
+    return this.address;
+  }
+
+  public void shutdownServer() {
+    if (started.get()) { // Primarily to avoid multiple shutdowns.
+      started.set(false);
+      server.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index 4db4d32..d308ec8 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -18,10 +18,17 @@ package org.apache.hadoop.hive.llap;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +45,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.io.FileNotFoundException;
+import java.util.UUID;
+
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -82,15 +91,14 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
-
 import com.google.common.base.Preconditions;
 
 public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
 
-  private TezWork work;
-  private Schema schema;
+  private final TezWork work;
+  private final Schema schema;
 
   public LlapInputFormat(TezWork tezWork, Schema schema) {
     this.work = tezWork;
@@ -98,22 +106,36 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
   }
 
   // need empty constructor for bean instantiation
-  public LlapInputFormat() {}
+  public LlapInputFormat() {
+    // None of these fields should be required during getRecordReader,
+    // and should not be read.
+    work = null;
+    schema = null;
+  }
 
   /*
    * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
    * off the work in the split to LLAP and finally return the connected socket back in an
    * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
    */
+  @Override
   public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
 
+    // Calls a static method to ensure none of the object fields are read.
+    return _getRecordReader(split, job, reporter);
+  }
+
+  private static RecordReader _getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
+      IOException {
     LlapInputSplit llapSplit = (LlapInputSplit)split;
 
     // TODO: push event into LLAP
 
     // this is just the portion that sets up the io to receive data
     String host = split.getLocations()[0];
-    String id = job.get(LlapOutputFormat.LLAP_OF_ID_KEY);
+
+    // TODO: need to construct id here. Format is queryId + "_" + taskIndex
+    String id = "foobar";
 
     HiveConf conf = new HiveConf();
     Socket socket = new Socket(host,
@@ -130,120 +152,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
   }
 
-  /*
-   * getSplits() gets called as part of the GenericUDFGetSplits call to get splits. Here we create
-   * an array of input splits from the work item we have, figure out the location for llap and pass
-   * that back for the submission. getRecordReader method above uses that split info to assign the
-   * work to llap.
-   */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    // TODO: need to build proto of plan
-
-    DAG dag = DAG.create(work.getName());
-    dag.setCredentials(job.getCredentials());
-    // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
-
-    DagUtils utils = DagUtils.getInstance();
-    Context ctx = new Context(job);
-    MapWork mapWork = (MapWork) work.getAllWork().get(0);
-    // bunch of things get setup in the context based on conf but we need only the MR tmp directory
-    // for the following method.
-    JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
-    Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
-    FileSystem fs = scratchDir.getFileSystem(job);
-    try {
-      LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
-      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
-          new ArrayList<LocalResource>(), fs, ctx, false, work,
-          work.getVertexType(mapWork));
-      dag.addVertex(wx);
-      utils.addCredentials(mapWork, dag);
-
-      // we have the dag now proceed to get the splits:
-      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
-      splitGenerator.initializeSplitGenerator(wxConf, mapWork);
-      List<Event> eventList = splitGenerator.initialize();
-
-      // hack - just serializing with kryo for now. This needs to be done properly
-      InputSplit[] result = new InputSplit[eventList.size()];
-      int i = 0;
-      ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
-
-      InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent)
-	eventList.remove(0);
-
-      List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
-      for (Event event: eventList) {
-	TaskLocationHint hint = hints.remove(0);
-        Set<String> hosts = hint.getHosts();
-	SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
-
-	int j = 0;
-	for (String host: hosts) {
-	  locations[j++] = new SplitLocationInfo(host,false);
-	}
-
-	bos.reset();
-	Kryo kryo = SerializationUtilities.borrowKryo();
-	SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
-	SerializationUtilities.releaseKryo(kryo);
-	result[i++] = new LlapInputSplit(bos.toByteArray(), locations, schema);
-      }
-      return result;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Returns a local resource representing a jar. This resource will be used to execute the plan on
-   * the cluster.
-   *
-   * @param localJarPath
-   *          Local path to the jar to be localized.
-   * @return LocalResource corresponding to the localized hive exec resource.
-   * @throws IOException
-   *           when any file system related call fails.
-   * @throws LoginException
-   *           when we are unable to determine the user.
-   * @throws URISyntaxException
-   *           when current jar location cannot be determined.
-   */
-  private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
-      Configuration conf)
-    throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
-    FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
-    assert destDirStatus != null;
-    Path destDirPath = destDirStatus.getPath();
-
-    Path localFile = new Path(localJarPath);
-    String sha = getSha(localFile, conf);
-
-    String destFileName = localFile.getName();
-
-    // Now, try to find the file based on SHA and name. Currently we require exact name match.
-    // We could also allow cutting off versions and other stuff provided that SHA matches...
-    destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
-      + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
-
-    // TODO: if this method is ever called on more than one jar, getting the dir and the
-    // list need to be refactored out to be done only once.
-    Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
-    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
-  }
-
-  private String getSha(Path localFile, Configuration conf)
-    throws IOException, IllegalArgumentException {
-    InputStream is = null;
-    try {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      is = localFs.open(localFile);
-      return DigestUtils.sha256Hex(is);
-    } finally {
-      if (is != null) {
-        is.close();
-      }
-    }
+    throw new IOException("These are not the splits you are looking for.");
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 78dbb34..4249a16 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -16,49 +16,49 @@
  */
 package org.apache.hadoop.hive.llap;
 
-import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+
 import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
 import org.apache.thrift.transport.AutoExpandingBuffer;
-
-import com.google.common.base.Preconditions;
+import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
 
 public class LlapInputSplit implements InputSplitWithLocationInfo {
 
-  byte[] queryFragment;
+  byte[] planBytes;
+  byte[] fragmentBytes;
   SplitLocationInfo[] locations;
   Schema schema;
 
-  public LlapInputSplit() {}
 
-  public LlapInputSplit(byte[] queryFragment, SplitLocationInfo[] locations, Schema schema) {
-    this.queryFragment = queryFragment;
+  // // Static
+  // ContainerIdString
+  // DagName
+  // VertexName
+  // FragmentNumber
+  // AttemptNumber - always 0
+  // FragmentIdentifierString - taskAttemptId
+
+  // ProcessorDescsriptor
+  // InputSpec
+  // OutputSpec
+
+  // Tokens
+
+  // // Dynamic
+  //
+
+  public LlapInputSplit() {
+  }
+
+  public LlapInputSplit(byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) {
+    this.planBytes = planBytes;
+    this.fragmentBytes = fragmentBytes;
     this.locations = locations;
     this.schema = schema;
   }
@@ -83,8 +83,11 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(queryFragment.length);
-    out.write(queryFragment);
+    out.writeInt(planBytes.length);
+    out.write(planBytes);
+
+    out.writeInt(fragmentBytes.length);
+    out.write(fragmentBytes);
 
     out.writeInt(locations.length);
     for (int i = 0; i < locations.length; ++i) {
@@ -108,11 +111,13 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    byte[] queryFragment;
-
     int length = in.readInt();
-    queryFragment = new byte[length];
-    in.readFully(queryFragment);
+    planBytes = new byte[length];
+    in.readFully(planBytes);
+
+    length = in.readInt();
+    fragmentBytes = new byte[length];
+    in.readFully(fragmentBytes);
 
     length = in.readInt();
     locations = new SplitLocationInfo[length];
@@ -124,7 +129,8 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
     length = in.readInt();
 
     try {
-      AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(length, 2d);
+      AutoExpandingBufferWriteTransport transport =
+          new AutoExpandingBufferWriteTransport(length, 2d);
       AutoExpandingBuffer buf = transport.getBuf();
       in.readFully(buf.array(), 0, length);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
new file mode 100644
index 0000000..a9a3738
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class SubmitWorkInfo implements Writable {
+
+  private TaskSpec taskSpec;
+  private ApplicationId fakeAppId;
+
+  public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId) {
+    this.taskSpec = taskSpec;
+    this.fakeAppId = fakeAppId;
+  }
+
+  // Empty constructor for writable etc.
+  public SubmitWorkInfo() {
+  }
+
+  public TaskSpec getTaskSpec() {
+    return taskSpec;
+  }
+
+  public ApplicationId getFakeAppId() {
+    return fakeAppId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskSpec.write(out);
+    out.writeLong(fakeAppId.getClusterTimestamp());
+    out.writeInt(fakeAppId.getId());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskSpec = new TaskSpec();
+    taskSpec.readFields(in);
+    long appIdTs = in.readLong();
+    int appIdId = in.readInt();
+    fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
+  }
+
+  public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    submitWorkInfo.write(dob);
+    return dob.getData();
+  }
+
+  public SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
+    SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
+    submitWorkInfo.readFields(dib);
+    return submitWorkInfo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index b0cda82..011e459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -44,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -82,10 +86,30 @@ public class HiveSplitGenerator extends InputInitializer {
   private final SplitGrouper splitGrouper = new SplitGrouper();
   private SplitLocationProvider splitLocationProvider = null;
 
-  public void initializeSplitGenerator(Configuration conf, MapWork work) {
+  
+  // TODO RSHACK This entire method needs to be reworked. Put back final fields, separate into reusable components etc.
+  public void initializeSplitGenerator(Configuration conf, MapWork work) throws IOException {
+
     this.conf = conf;
     this.work = work;
-    this.jobConf = new JobConf(conf);
+
+    // TODO RSHACK - assuming grouping enabled always.
+    userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();
+
+    this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
+    LOG.info("SplitLocationProvider: " + splitLocationProvider);
+
+    // Read all credentials into the credentials instance stored in JobConf.
+    ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
+
+    this.work = Utilities.getMapWork(jobConf);
+
+    // Events can start coming in the moment the InputInitializer is created. The pruner
+    // must be setup and initialized here so that it sets up it's structures to start accepting events.
+    // Setting it up in initialize leads to a window where events may come in before the pruner is
+    // initialized, which may cause it to drop events.
+    // TODO RSHACK - No dynamic partition pruning
+    pruner = null;
   }
 
   public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
@@ -129,7 +153,9 @@ public class HiveSplitGenerator extends InputInitializer {
           conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
 
       // perform dynamic partition pruning
-      pruner.prune();
+      if (pruner != null) {
+        pruner.prune();
+      }
 
       InputSplitInfoMem inputSplitInfo = null;
       boolean generateConsistentSplits = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS);
@@ -142,9 +168,20 @@ public class HiveSplitGenerator extends InputInitializer {
           (InputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(realInputFormatName),
               jobConf);
 
-        int totalResource = getContext().getTotalAvailableResource().getMemory();
-        int taskResource = getContext().getVertexTaskResource().getMemory();
-        int availableSlots = totalResource / taskResource;
+        int totalResource = 0;
+        int taskResource = 0;
+        int availableSlots = 0;
+        // FIXME. Do the right thing Luke.
+        if (getContext() == null) {
+          // for now, totalResource = taskResource for llap
+          availableSlots = 1;
+        }
+
+        if (getContext() != null) {
+          totalResource = getContext().getTotalAvailableResource().getMemory();
+          taskResource = getContext().getVertexTaskResource().getMemory();
+          availableSlots = totalResource / taskResource;
+        }
 
         if (HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 1) <= 1) {
           // broken configuration from mapred-default.xml

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 0584ad8..3fe70ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -94,6 +95,7 @@ public class MapRecordProcessor extends RecordProcessor {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     if (LlapProxy.isDaemon()) { // do not cache plan
+      jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, queryId + "_" + context.getTaskIndex());
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
index 598520c..0997233 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
@@ -1315,12 +1315,15 @@ public class TypeCheckProcFactory {
       try {
         return getXpathOrFuncExprNodeDesc(expr, isFunction, children, ctx);
       } catch (UDFArgumentTypeException e) {
+        LOG.error("UDFArgumentTypeException: ", e);
         throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_TYPE.getMsg(expr
             .getChild(childrenBegin + e.getArgumentId()), e.getMessage()));
       } catch (UDFArgumentLengthException e) {
+        LOG.error("UDFArgumentLengthException: ", e);
         throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_LENGTH.getMsg(
             expr, e.getMessage()));
       } catch (UDFArgumentException e) {
+        LOG.error("UDFArgumentException: ", e);
         throw new SemanticException(ErrorMsg.INVALID_ARGUMENT.getMsg(expr, e
             .getMessage()));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
index 3b7dcd9..9c7e1f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
@@ -18,6 +18,20 @@
 
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+
+import javax.security.auth.login.LoginException;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.ArrayList;
@@ -28,6 +42,17 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.DataOutput;
 
+import com.esotericsoftware.kryo.Kryo;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.InputStream;
+
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +94,55 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.metastore.api.Schema;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.io.FileNotFoundException;
+import java.util.UUID;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
 /**
  * GenericUDFGetSplits.
@@ -177,7 +251,6 @@ public class GenericUDFGetSplits extends GenericUDF {
     }
 
     Path data = null;
-    InputFormat inp = null;
     String ifc = null;
 
     TezWork tezWork = ((TezTask)roots.get(0)).getWork();
@@ -214,33 +287,13 @@ public class GenericUDFGetSplits extends GenericUDF {
       }
 
       tezWork = ((TezTask)roots.get(0)).getWork();
-
-      // Table table = db.getTable(tableName);
-      // if (table.isPartitioned()) {
-      //   throw new UDFArgumentException("Table " + tableName + " is partitioned.");
-      // }
-      // data = table.getDataLocation();
-      // LOG.info("looking at: "+data);
-
-      // ifc = table.getInputFormatClass().toString();
-
-      // inp = ReflectionUtils.newInstance(table.getInputFormatClass(), jc);
     }
 
     MapWork w = (MapWork)tezWork.getAllWork().get(0);
-    inp = new LlapInputFormat(tezWork, schema);
     ifc = LlapInputFormat.class.toString();
 
     try {
-      if (inp instanceof JobConfigurable) {
-        ((JobConfigurable) inp).configure(jc);
-      }
-
-      if (inp instanceof FileInputFormat) {
-        ((FileInputFormat) inp).addInputPath(jc, data);
-      }
-
-      for (InputSplit s: inp.getSplits(jc, num)) {
+      for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
         Object[] os = new Object[3];
         os[0] = ifc;
         os[1] = s.getClass().toString();
@@ -257,6 +310,133 @@ public class GenericUDFGetSplits extends GenericUDF {
     return retArray;
   }
 
+  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) throws IOException {
+    DAG dag = DAG.create(work.getName());
+    dag.setCredentials(job.getCredentials());
+    // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
+
+    DagUtils utils = DagUtils.getInstance();
+    Context ctx = new Context(job);
+    MapWork mapWork = (MapWork) work.getAllWork().get(0);
+    // bunch of things get setup in the context based on conf but we need only the MR tmp directory
+    // for the following method.
+    JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
+    Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
+    FileSystem fs = scratchDir.getFileSystem(job);
+    try {
+      LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
+          new ArrayList<LocalResource>(), fs, ctx, false, work,
+          work.getVertexType(mapWork));
+      String vertexName = wx.getName();
+      dag.addVertex(wx);
+      utils.addCredentials(mapWork, dag);
+
+      // we have the dag now proceed to get the splits:
+      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
+      Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+          HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
+      Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+          HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+      splitGenerator.initializeSplitGenerator(wxConf, mapWork);
+      List<Event> eventList = splitGenerator.initialize();
+
+      // hack - just serializing with kryo for now. This needs to be done properly
+      InputSplit[] result = new InputSplit[eventList.size()];
+      ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
+
+      InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0);
+
+      List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
+
+      Preconditions.checkState(hints.size() == eventList.size() -1);
+
+      LOG.info("DBG: Number of splits: " + (eventList.size() - 1));
+      for (int i = 1 ; i < eventList.size() ; i++) {
+        // Creating the TezEvent here itself, since it's easy to serialize.
+        Event event = eventList.get(i);
+        TaskLocationHint hint = hints.get(i-1);
+        Set<String> hosts = hint.getHosts();
+        LOG.info("DBG: Using locations: " + hosts.toString());
+        if (hosts.size() != 1) {
+          LOG.warn("DBG: Bad # of locations: " + hosts.size());
+        }
+        SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
+
+        int j = 0;
+        for (String host : hosts) {
+          locations[j++] = new SplitLocationInfo(host, false);
+        }
+
+        bos.reset();
+        Kryo kryo = SerializationUtilities.borrowKryo();
+        SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
+        SerializationUtilities.releaseKryo(kryo);
+
+        TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1);
+        ApplicationId fakeApplicationId = ApplicationId.newInstance(new Random().nextInt(), 0);
+        SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(taskSpec, fakeApplicationId);
+        byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
+
+        result[i-1] = new LlapInputSplit(submitWorkBytes, bos.toByteArray(), locations, schema);
+      }
+      return result;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+    /**
+   * Returns a local resource representing a jar. This resource will be used to execute the plan on
+   * the cluster.
+   *
+   * @param localJarPath
+   *          Local path to the jar to be localized.
+   * @return LocalResource corresponding to the localized hive exec resource.
+   * @throws IOException
+   *           when any file system related call fails.
+   * @throws LoginException
+   *           when we are unable to determine the user.
+   * @throws URISyntaxException
+   *           when current jar location cannot be determined.
+   */
+  private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
+      Configuration conf)
+    throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
+    FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
+    assert destDirStatus != null;
+    Path destDirPath = destDirStatus.getPath();
+
+    Path localFile = new Path(localJarPath);
+    String sha = getSha(localFile, conf);
+
+    String destFileName = localFile.getName();
+
+    // Now, try to find the file based on SHA and name. Currently we require exact name match.
+    // We could also allow cutting off versions and other stuff provided that SHA matches...
+    destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+      + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
+
+    // TODO: if this method is ever called on more than one jar, getting the dir and the
+    // list need to be refactored out to be done only once.
+    Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
+  }
+
+  private String getSha(Path localFile, Configuration conf)
+    throws IOException, IllegalArgumentException {
+    InputStream is = null;
+    try {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      is = localFs.open(localFile);
+      return DigestUtils.sha256Hex(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+
   @Override
   public String getDisplayString(String[] children) {
     assert children.length == 2;

http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
new file mode 100644
index 0000000..d0c7c5a
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -0,0 +1,45 @@
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+// Proxy class within the tez.api package to access package private methods.
+public class TaskSpecBuilder {
+
+  public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits) {
+    Vertex vertex = dag.getVertex(vertexName);
+    ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
+    List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
+        vertex.getInputs();
+    List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> outputs =
+        vertex.getOutputs();
+
+    // TODO RSHACK - for now these must be of size 1.
+    Preconditions.checkState(inputs.size() == 1);
+    Preconditions.checkState(outputs.size() == 1);
+
+    List<InputSpec> inputSpecs = new ArrayList<>();
+    for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
+      InputSpec inputSpec = new InputSpec(input.getName(), input.getIODescriptor(), 1);
+      inputSpecs.add(inputSpec);
+    }
+
+    List<OutputSpec> outputSpecs = new ArrayList<>();
+    for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : outputs) {
+      OutputSpec outputSpec = new OutputSpec(output.getName(), output.getIODescriptor(), 1);
+      outputSpecs.add(outputSpec);
+    }
+
+    TaskSpec taskSpec = TaskSpec
+        .createBaseTaskSpec(dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs,
+            outputSpecs, null);
+
+    return taskSpec;
+  }
+
+}