You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:53 UTC

[03/39] hive git commit: HIVE-13140. Wire the client to submit execution fragments. (Gunther Hagleitner, Siddharth Seth and Vikram Dixit K)

HIVE-13140. Wire the client to submit execution fragments. (Gunther
Hagleitner, Siddharth Seth and Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e042cc1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e042cc1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e042cc1

Branch: refs/heads/master
Commit: 2e042cc159c0e7e044297fc4d6b177a8841eb7fd
Parents: f272ace
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Feb 23 23:56:38 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Feb 23 23:56:38 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hive/jdbc/LlapInputFormat.java   |   7 +-
 .../hadoop/hive/llap/LlapInputFormat.java       | 198 ++++++++++++
 .../ext/LlapTaskUmbilicalExternalClient.java    |  23 +-
 .../apache/hadoop/hive/llap/LlapInputSplit.java |  77 ++---
 .../apache/hadoop/hive/llap/SubmitWorkInfo.java |  42 ++-
 .../ql/udf/generic/GenericUDFGetSplits.java     | 317 +++++++++----------
 .../org/apache/tez/dag/api/TaskSpecBuilder.java |  27 +-
 7 files changed, 467 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
index c38dd82..e662414 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -130,8 +130,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
 
   @Override
   public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    LlapInputSplit llapSplit = (LlapInputSplit)split;
-    return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+    try {
+      return ((InputFormat)Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance()).getRecordReader(split, job, reporter);
+    } catch (Exception e) { throw new IOException(e); }
   }
 
   @Override
@@ -160,7 +161,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
       ResultSet res = stmt.executeQuery(sql);
       while (res.next()) {
         // deserialize split
-        DataInput in = new DataInputStream(new ByteArrayInputStream(res.getBytes(3)));
+        DataInput in = new DataInputStream(res.getBinaryStream(3));
         InputSplit is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); // todo setAccessible on ctor
         is.readFields(in);
         ins.add(is);

http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
new file mode 100644
index 0000000..cf13c1e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
+
+
+  public LlapInputFormat() {
+  }
+
+  /*
+   * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
+   * off the work in the split to LLAP and finally return the connected socket back in an
+   * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
+   */
+  @Override
+  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
+                                                       Reporter reporter) throws IOException {
+
+    LlapInputSplit llapSplit = (LlapInputSplit) split;
+    SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
+
+    int llapSubmitPort = HiveConf.getIntVar(job, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
+
+    LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient");
+
+    LlapTaskUmbilicalExternalClient llapClient =
+        new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+            submitWorkInfo.getToken());
+    llapClient.init(job);
+    llapClient.start();
+
+    LOG.info("ZZZ: DBG: Crated LlapClient");
+    // TODO KKK Shutdown the llap client.
+
+    SubmitWorkRequestProto submitWorkRequestProto =
+        constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+            llapClient.getAddress(), submitWorkInfo.getToken());
+
+    LOG.info("ZZZ: DBG: Created submitWorkRequest for: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+
+    TezEvent tezEvent = new TezEvent();
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
+    tezEvent.readFields(dib);
+    List<TezEvent> tezEventList = Lists.newArrayList();
+    tezEventList.add(tezEvent);
+
+    // this is just the portion that sets up the io to receive data
+    String host = split.getLocations()[0];
+
+    llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
+
+    String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
+
+    HiveConf conf = new HiveConf();
+    Socket socket = new Socket(host,
+        conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+
+    LOG.debug("Socket connected");
+
+    socket.getOutputStream().write(id.getBytes());
+    socket.getOutputStream().write(0);
+    socket.getOutputStream().flush();
+
+    LOG.debug("Registered id: " + id);
+
+    return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    throw new IOException("These are not the splits you are looking for.");
+  }
+
+  private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
+                                                                 int taskNum,
+                                                                 InetSocketAddress address,
+                                                                 Token<JobTokenIdentifier> token) throws
+      IOException {
+    TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
+    ApplicationId appId = submitWorkInfo.getFakeAppId();
+
+    SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
+    // This works, assuming the executor is running within YARN.
+    LOG.info("DBG: Setting user in submitWorkRequest to: " +
+        System.getenv(ApplicationConstants.Environment.USER.name()));
+    builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    builder.setApplicationIdString(appId.toString());
+    builder.setAppAttemptNumber(0);
+    builder.setTokenIdentifier(appId.toString());
+
+    ContainerId containerId =
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
+    builder.setContainerIdString(containerId.toString());
+
+
+    builder.setAmHost(address.getHostName());
+    builder.setAmPort(address.getPort());
+    Credentials taskCredentials = new Credentials();
+    // Credentials can change across DAGs. Ideally construct only once per DAG.
+    // TODO Figure out where credentials will come from. Normally Hive sets up
+    // URLs on the tez dag, for which Tez acquires credentials.
+
+//    taskCredentials.addAll(getContext().getCredentials());
+
+//    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+//        taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+//    ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+//    if (credentialsBinary == null) {
+//      credentialsBinary = serializeCredentials(getContext().getCredentials());
+//      credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+//    } else {
+//      credentialsBinary = credentialsBinary.duplicate();
+//    }
+//    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    Credentials credentials = new Credentials();
+    TokenCache.setSessionToken(token, credentials);
+    ByteBuffer credentialsBinary = serializeCredentials(credentials);
+    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+
+
+    builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+
+    FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
+    runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
+    runtimeInfo.setWithinDagPriority(0);
+    runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
+    runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
+    runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
+    runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
+
+
+    builder.setUsingTezAm(false);
+    builder.setFragmentRuntimeInfo(runtimeInfo.build());
+    return builder.build();
+  }
+
+  private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+    return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index ecc032d..4305682 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -1,6 +1,8 @@
 package org.apache.hadoop.hive.llap.ext;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -55,6 +57,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
     this.sessionToken = sessionToken;
     // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
     this.communicator = new LlapProtocolClientProxy(1, conf, null);
+    this.communicator.init(conf);
   }
 
   @Override
@@ -62,6 +65,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
     int numHandlers = HiveConf.getIntVar(conf,
         HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
     llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
+    communicator.start();
   }
 
   @Override
@@ -72,24 +76,31 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
     }
   }
 
+  public InetSocketAddress getAddress() {
+    return llapTaskUmbilicalServer.getAddress();
+  }
+
 
   /**
    * Submit the work for actual execution. This should always have the usingTezAm flag disabled
    * @param submitWorkRequestProto
    */
-  public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort) {
+  public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
     Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
 
-    // Store the actual event first. To be returned on the first heartbeat.
-    Event mrInputEvent = null;
-    // Construct a TezEvent out of this, to send it out on the next heaertbeat
 
+    LOG.warn("ZZZ: DBG: " + " Submitting fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " on host: " + llapHost + ", port=" + llapPort);
+//    LOG.info("ZZZ: DBG: " + " Complete SubmitWorkRequest: " + submitWorkRequestProto);
 //    submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()
 
+    LOG.info("ZZZ: DBG: Received {} events for {}", tezEvents.size(), submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+    // Register the pending events to be sent for this spec.
+    pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents);
 
     // Send out the actual SubmitWorkRequest
     communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
         new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
+
           @Override
           public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
             if (response.hasSubmissionState()) {
@@ -110,6 +121,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
 
 
 
+
 //    // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
 //    // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
 //    QueryIdentifierProto queryIdentifier = QueryIdentifierProto
@@ -157,6 +169,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
       LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString());
 
       List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
+      if (tezEvents == null) {
+        tezEvents = Collections.emptyList();
+      }
 
       response.setLastRequestId(request.getRequestId());
       // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.

http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 4249a16..d26a579 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -23,44 +23,26 @@ import java.io.IOException;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.AutoExpandingBuffer;
-import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
 
 public class LlapInputSplit implements InputSplitWithLocationInfo {
 
+  int splitNum;
   byte[] planBytes;
   byte[] fragmentBytes;
   SplitLocationInfo[] locations;
   Schema schema;
 
-
-  // // Static
-  // ContainerIdString
-  // DagName
-  // VertexName
-  // FragmentNumber
-  // AttemptNumber - always 0
-  // FragmentIdentifierString - taskAttemptId
-
-  // ProcessorDescsriptor
-  // InputSpec
-  // OutputSpec
-
-  // Tokens
-
-  // // Dynamic
-  //
-
   public LlapInputSplit() {
   }
 
-  public LlapInputSplit(byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) {
+  public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) {
     this.planBytes = planBytes;
     this.fragmentBytes = fragmentBytes;
     this.locations = locations;
     this.schema = schema;
+    this.splitNum = splitNum;
   }
 
   public Schema getSchema() {
@@ -81,8 +63,23 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
     return locs;
   }
 
+  public int getSplitNum() {
+    return splitNum;
+  }
+
+  public byte[] getPlanBytes() {
+    return planBytes;
+  }
+
+  public byte[] getFragmentBytes() {
+    return fragmentBytes;
+  }
+
+
+
   @Override
   public void write(DataOutput out) throws IOException {
+    out.writeInt(splitNum);
     out.writeInt(planBytes.length);
     out.write(planBytes);
 
@@ -97,20 +94,24 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
     byte[] binarySchema;
 
     try {
-      AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d);
-      TProtocol protocol = new TBinaryProtocol(transport);
-      schema.write(protocol);
-      binarySchema = transport.getBuf().array();
+      TSerializer serializer = new TSerializer();
+      byte[] serialzied = serializer.serialize(schema);
+      out.writeInt(serialzied.length);
+      out.write(serialzied);
+//      AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d);
+//      TProtocol protocol = new TBinaryProtocol(transport);
+//      schema.write(protocol);
+//      binarySchema = transport.getBuf().array();
     } catch (Exception e) {
       throw new IOException(e);
     }
 
-    out.writeInt(binarySchema.length);
-    out.write(binarySchema);
+
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    splitNum = in.readInt();
     int length = in.readInt();
     planBytes = new byte[length];
     in.readFully(planBytes);
@@ -129,14 +130,18 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
     length = in.readInt();
 
     try {
-      AutoExpandingBufferWriteTransport transport =
-          new AutoExpandingBufferWriteTransport(length, 2d);
-      AutoExpandingBuffer buf = transport.getBuf();
-      in.readFully(buf.array(), 0, length);
-
-      TProtocol protocol = new TBinaryProtocol(transport);
+      byte[] schemaBytes = new byte[length];
+      in.readFully(schemaBytes);
+      TDeserializer tDeserializer = new TDeserializer();
       schema = new Schema();
-      schema.read(protocol);
+      tDeserializer.deserialize(schema, schemaBytes);
+//      AutoExpandingBufferReadTransport transport = new AutoExpandingBufferReadTransport(length, 2d);
+//      AutoExpandingBuffer buf = transport.getBuf();
+//      in.readFully(buf.array(), 0, length);
+//
+//      TProtocol protocol = new TBinaryProtocol(transport);
+//      schema = new Schema();
+//      schema.read(protocol);
     } catch (Exception e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
index a9a3738..83149ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -6,18 +6,29 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
 public class SubmitWorkInfo implements Writable {
 
   private TaskSpec taskSpec;
   private ApplicationId fakeAppId;
+  private long creationTime;
 
-  public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId) {
+  // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to
+  // talk to LLAP daemons itself via the securit work.
+  private Token<JobTokenIdentifier> token;
+
+  public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) {
     this.taskSpec = taskSpec;
     this.fakeAppId = fakeAppId;
+    this.token = createJobToken();
+    this.creationTime = creationTime;
   }
 
   // Empty constructor for writable etc.
@@ -32,11 +43,25 @@ public class SubmitWorkInfo implements Writable {
     return fakeAppId;
   }
 
+  public String getTokenIdentifier() {
+    return fakeAppId.toString();
+  }
+
+  public Token<JobTokenIdentifier> getToken() {
+    return token;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     taskSpec.write(out);
     out.writeLong(fakeAppId.getClusterTimestamp());
     out.writeInt(fakeAppId.getId());
+    token.write(out);
+    out.writeLong(creationTime);
   }
 
   @Override
@@ -46,6 +71,9 @@ public class SubmitWorkInfo implements Writable {
     long appIdTs = in.readLong();
     int appIdId = in.readInt();
     fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
+    token = new Token<>();
+    token.readFields(in);
+    creationTime = in.readLong();
   }
 
   public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
@@ -54,7 +82,7 @@ public class SubmitWorkInfo implements Writable {
     return dob.getData();
   }
 
-  public SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
+  public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
     DataInputBuffer dib = new DataInputBuffer();
     dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
     SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
@@ -62,4 +90,14 @@ public class SubmitWorkInfo implements Writable {
     return submitWorkInfo;
   }
 
+
+  private Token<JobTokenIdentifier> createJobToken() {
+    String tokenIdentifier = fakeAppId.toString();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+        tokenIdentifier));
+    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+        new JobTokenSecretManager());
+    sessionToken.setService(identifier.getJobId());
+    return sessionToken;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
index 9c7e1f2..9fa4aa8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
@@ -18,131 +18,79 @@
 
 package org.apache.hadoop.hive.ql.udf.generic;
 
-import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-
+import javax.security.auth.login.LoginException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-
-import javax.security.auth.login.LoginException;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.UUID;
-import java.io.Serializable;
-import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.DataOutput;
 
 import com.esotericsoftware.kryo.Kryo;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
-import java.io.InputStream;
-
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hive.ql.udf.UDFType;
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.llap.LlapInputFormat;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.metastore.api.Schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.io.FileNotFoundException;
-import java.util.UUID;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * GenericUDFGetSplits.
@@ -155,6 +103,8 @@ public class GenericUDFGetSplits extends GenericUDF {
 
   private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class);
 
+  private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
+
   private transient StringObjectInspector stringOI;
   private transient IntObjectInspector intOI;
   private final ArrayList<Object> retArray = new ArrayList<Object>();
@@ -190,13 +140,13 @@ public class GenericUDFGetSplits extends GenericUDF {
     } else if (!(arguments[0] instanceof StringObjectInspector)) {
       LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
       throw new UDFArgumentTypeException(0, "\""
-	  + "string\" is expected at function GET_SPLITS, " + "but \""
-	  + arguments[0].getTypeName() + "\" is found");
+          + "string\" is expected at function GET_SPLITS, " + "but \""
+          + arguments[0].getTypeName() + "\" is found");
     } else if (!(arguments[1] instanceof IntObjectInspector)) {
       LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
       throw new UDFArgumentTypeException(1, "\""
-	  + "int\" is expected at function GET_SPLITS, " + "but \""
-	  + arguments[1].getTypeName() + "\" is found");
+          + "int\" is expected at function GET_SPLITS, " + "but \""
+          + arguments[1].getTypeName() + "\" is found");
     }
 
     stringOI = (StringObjectInspector) arguments[0];
@@ -204,9 +154,9 @@ public class GenericUDFGetSplits extends GenericUDF {
 
     List<String> names = Arrays.asList("if_class","split_class","split");
     List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
-								    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-								    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-								    PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+                                                                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                                                                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+                                                                    PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
     ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
     ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI);
     bos = new ByteArrayOutputStream(1024);
@@ -233,80 +183,85 @@ public class GenericUDFGetSplits extends GenericUDF {
       throw new HiveException("Need configuration");
     }
 
-    LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.toString()+"\"");
-    HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
-    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.toString());
+    String fetchTaskConversion = HiveConf.getVar(conf, ConfVars.HIVEFETCHTASKCONVERSION);
+    String queryResultFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
 
-    cpr = driver.compileAndRespond(query);
-    if(cpr.getResponseCode() != 0) {
-      throw new HiveException("Failed to compile query: "+cpr.getException());
-    }
+    try {
+      LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\"");
+      HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.getName());
 
-    QueryPlan plan = driver.getPlan();
-    List<Task<?>> roots = plan.getRootTasks();
-    Schema schema = plan.getResultSchema();
+      cpr = driver.compileAndRespond(query);
+      if(cpr.getResponseCode() != 0) {
+        throw new HiveException("Failed to compile query: "+cpr.getException());
+      }
 
-    if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
-      throw new HiveException("Was expecting a single TezTask.");
-    }
+      QueryPlan plan = driver.getPlan();
+      List<Task<?>> roots = plan.getRootTasks();
+      Schema schema = plan.getResultSchema();
 
-    Path data = null;
-    String ifc = null;
+      if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+        throw new HiveException("Was expecting a single TezTask.");
+      }
 
-    TezWork tezWork = ((TezTask)roots.get(0)).getWork();
+      Path data = null;
 
-    if (tezWork.getAllWork().size() != 1) {
+      TezWork tezWork = ((TezTask)roots.get(0)).getWork();
 
-      String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
+      if (tezWork.getAllWork().size() != 1) {
 
-      String ctas = "create temporary table "+tableName+" as "+query;
-      LOG.info("CTAS: "+ctas);
+        String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
 
-      try {
-        cpr = driver.run(ctas, false);
-      } catch(CommandNeedRetryException e) {
-        throw new HiveException(e);
-      }
+        String ctas = "create temporary table "+tableName+" as "+query;
+        LOG.info("CTAS: "+ctas);
 
-      if(cpr.getResponseCode() != 0) {
-        throw new HiveException("Failed to create temp table: " + cpr.getException());
-      }
+        try {
+          cpr = driver.run(ctas, false);
+        } catch(CommandNeedRetryException e) {
+          throw new HiveException(e);
+        }
 
-      query = "select * from " + tableName;
-      cpr = driver.compileAndRespond(query);
-      if(cpr.getResponseCode() != 0) {
-        throw new HiveException("Failed to create temp table: "+cpr.getException());
-      }
+        if(cpr.getResponseCode() != 0) {
+          throw new HiveException("Failed to create temp table: " + cpr.getException());
+        }
 
-      plan = driver.getPlan();
-      roots = plan.getRootTasks();
-      schema = plan.getResultSchema();
+        query = "select * from " + tableName;
+        cpr = driver.compileAndRespond(query);
+        if(cpr.getResponseCode() != 0) {
+          throw new HiveException("Failed to create temp table: "+cpr.getException());
+        }
 
-      if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
-        throw new HiveException("Was expecting a single TezTask.");
-      }
+        plan = driver.getPlan();
+        roots = plan.getRootTasks();
+        schema = plan.getResultSchema();
 
-      tezWork = ((TezTask)roots.get(0)).getWork();
-    }
+        if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+          throw new HiveException("Was expecting a single TezTask.");
+        }
 
-    MapWork w = (MapWork)tezWork.getAllWork().get(0);
-    ifc = LlapInputFormat.class.toString();
+        tezWork = ((TezTask)roots.get(0)).getWork();
+      }
 
-    try {
-      for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
-        Object[] os = new Object[3];
-        os[0] = ifc;
-        os[1] = s.getClass().toString();
-        bos.reset();
-        s.write(dos);
-        byte[] frozen = bos.toByteArray();
-        os[2] = frozen;
-        retArray.add(os);
+      MapWork w = (MapWork)tezWork.getAllWork().get(0);
+
+      try {
+        for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
+          Object[] os = new Object[3];
+          os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
+          os[1] = s.getClass().getName();
+          bos.reset();
+          s.write(dos);
+          byte[] frozen = bos.toByteArray();
+          os[2] = frozen;
+          retArray.add(os);
+        }
+      } catch(Exception e) {
+        throw new HiveException(e);
       }
-    } catch(Exception e) {
-      throw new HiveException(e);
+    } finally {
+      HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, fetchTaskConversion);
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, queryResultFormat);
     }
-
     return retArray;
   }
 
@@ -332,6 +287,7 @@ public class GenericUDFGetSplits extends GenericUDF {
       dag.addVertex(wx);
       utils.addCredentials(mapWork, dag);
 
+
       // we have the dag now proceed to get the splits:
       HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
@@ -342,8 +298,8 @@ public class GenericUDFGetSplits extends GenericUDF {
       List<Event> eventList = splitGenerator.initialize();
 
       // hack - just serializing with kryo for now. This needs to be done properly
-      InputSplit[] result = new InputSplit[eventList.size()];
-      ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
+      InputSplit[] result = new InputSplit[eventList.size() - 1];
+      DataOutputBuffer dob = new DataOutputBuffer();
 
       InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0);
 
@@ -351,11 +307,25 @@ public class GenericUDFGetSplits extends GenericUDF {
 
       Preconditions.checkState(hints.size() == eventList.size() -1);
 
+      LOG.error("DBG: NumEvents=" + eventList.size());
+      LOG.error("DBG: NumSplits=" + result.length);
+
+      ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
+      TaskSpec taskSpec =
+          new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId);
+
+      SubmitWorkInfo submitWorkInfo =
+          new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
+      EventMetaData sourceMetaData =
+          new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
+              "NULL_VERTEX", null);
+      EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx);
+
       LOG.info("DBG: Number of splits: " + (eventList.size() - 1));
-      for (int i = 1 ; i < eventList.size() ; i++) {
+      for (int i = 0; i < eventList.size() - 1; i++) {
         // Creating the TezEvent here itself, since it's easy to serialize.
-        Event event = eventList.get(i);
-        TaskLocationHint hint = hints.get(i-1);
+        Event event = eventList.get(i + 1);
+        TaskLocationHint hint = hints.get(i);
         Set<String> hosts = hint.getHosts();
         LOG.info("DBG: Using locations: " + hosts.toString());
         if (hosts.size() != 1) {
@@ -367,18 +337,17 @@ public class GenericUDFGetSplits extends GenericUDF {
         for (String host : hosts) {
           locations[j++] = new SplitLocationInfo(host, false);
         }
+        TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis());
+        tezEvent.setDestinationInfo(destinationMetaInfo);
 
         bos.reset();
-        Kryo kryo = SerializationUtilities.borrowKryo();
-        SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
-        SerializationUtilities.releaseKryo(kryo);
+        dob.reset();
+        tezEvent.write(dob);
 
-        TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1);
-        ApplicationId fakeApplicationId = ApplicationId.newInstance(new Random().nextInt(), 0);
-        SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(taskSpec, fakeApplicationId);
         byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
 
-        result[i-1] = new LlapInputSplit(submitWorkBytes, bos.toByteArray(), locations, schema);
+        result[i] =
+            new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema);
       }
       return result;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
index d0c7c5a..5cabb6a 100644
--- a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -4,6 +4,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -11,7 +17,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 // Proxy class within the tez.api package to access package private methods.
 public class TaskSpecBuilder {
 
-  public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits) {
+  public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId) {
     Vertex vertex = dag.getVertex(vertexName);
     ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
     List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
@@ -35,11 +41,22 @@ public class TaskSpecBuilder {
       outputSpecs.add(outputSpec);
     }
 
-    TaskSpec taskSpec = TaskSpec
-        .createBaseTaskSpec(dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs,
-            outputSpecs, null);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 0);
+    TezVertexID vertexId = TezVertexID.getInstance(dagId, 0);
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, 0);
+    TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+    return new TaskSpec(taskAttemptId, dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, outputSpecs, null);
+  }
 
-    return taskSpec;
+  public EventMetaData getDestingationMetaData(Vertex vertex) {
+    List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
+        vertex.getInputs();
+    Preconditions.checkState(inputs.size() == 1);
+    String inputName = inputs.get(0).getName();
+    EventMetaData destMeta =
+        new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertex.getName(),
+            inputName, null);
+    return destMeta;
   }
 
 }