You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:25:09 UTC

[19/39] hive git commit: HIVE-13529: Move around some of the classes created during llap branch work

HIVE-13529: Move around some of the classes created during llap branch work


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7b9096a9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7b9096a9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7b9096a9

Branch: refs/heads/master
Commit: 7b9096a922f9706909ba0e52d8188d182a79612f
Parents: fc7343d
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri Apr 15 16:45:32 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri Apr 15 16:45:32 2016 -0700

----------------------------------------------------------------------
 itests/hive-unit/pom.xml                        |   5 +
 .../hadoop/hive/jdbc/TestLlapInputSplit.java    | 100 -----
 .../hive/llap/ext/TestLlapInputSplit.java       | 100 +++++
 .../apache/hive/jdbc/TestJdbcWithMiniLlap.java  |   4 +-
 .../apache/hive/jdbc/LlapBaseInputFormat.java   | 135 ------
 .../src/java/org/apache/hive/jdbc/LlapDump.java | 164 --------
 .../org/apache/hive/jdbc/LlapInputSplit.java    |  73 ----
 .../apache/hive/jdbc/LlapRowInputFormat.java    |  34 --
 llap-client/pom.xml                             |  32 ++
 .../hadoop/hive/llap/LlapBaseRecordReader.java  | 205 +++++++++
 .../hadoop/hive/llap/LlapInputFormat.java       | 392 ++++++++++++++++++
 .../apache/hadoop/hive/llap/LlapInputSplit.java | 131 ++++++
 .../hadoop/hive/llap/LlapRowRecordReader.java   | 155 +++++++
 .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 103 +++++
 .../ext/LlapTaskUmbilicalExternalClient.java    | 415 +++++++++++++++++++
 .../helpers/LlapTaskUmbilicalServer.java        |  57 +++
 .../hadoop/hive/llap/LlapRowRecordReader.java   | 155 -------
 llap-ext-client/pom.xml                         | 140 +++++++
 .../hadoop/hive/llap/LlapBaseInputFormat.java   | 136 ++++++
 .../org/apache/hadoop/hive/llap/LlapDump.java   | 165 ++++++++
 .../hadoop/hive/llap/LlapRowInputFormat.java    |  36 ++
 .../apache/hive/llap/ext/LlapInputSplit.java    |  73 ++++
 .../hadoop/hive/llap/LlapInputFormat.java       | 392 ------------------
 .../ext/LlapTaskUmbilicalExternalClient.java    | 415 -------------------
 .../helpers/LlapTaskUmbilicalServer.java        |  57 ---
 pom.xml                                         |   1 +
 .../hadoop/hive/llap/LlapBaseRecordReader.java  | 205 ---------
 .../apache/hadoop/hive/llap/LlapInputSplit.java | 131 ------
 .../apache/hadoop/hive/llap/SubmitWorkInfo.java | 103 -----
 29 files changed, 2148 insertions(+), 1966 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index ae231de..b248673 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -61,6 +61,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-ext-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-llap-server</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
deleted file mode 100644
index 366e326..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.hive.jdbc;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.hadoop.io.Text;
-
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.TypeDesc;
-
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import static org.junit.Assert.*;
-
-public class TestLlapInputSplit {
-
-  @Test
-  public void testWritable() throws Exception {
-    int splitNum = 88;
-    byte[] planBytes = "0123456789987654321".getBytes();
-    byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes();
-    SplitLocationInfo[] locations = {
-        new SplitLocationInfo("location1", false),
-        new SplitLocationInfo("location2", false),
-    };
-
-    ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
-    colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING)));
-    colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
-    Schema schema = new Schema(colDescs);
-
-    org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
-        splitNum,
-        planBytes,
-        fragmentBytes,
-        locations,
-        schema,
-        "hive");
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
-    split1.write(dataOut);
-    ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
-    DataInputStream dataIn = new DataInputStream(byteInStream);
-    org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit();
-    split2.readFields(dataIn);
-
-    // Did we read all the data?
-    assertEquals(0, byteInStream.available());
-
-    checkLlapSplits(split1, split2);
-
-    // Try JDBC LlapInputSplits
-    org.apache.hive.jdbc.LlapInputSplit<Text> jdbcSplit1 =
-        new org.apache.hive.jdbc.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat");
-    byteOutStream.reset();
-    jdbcSplit1.write(dataOut);
-    byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
-    dataIn = new DataInputStream(byteInStream);
-    org.apache.hive.jdbc.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.jdbc.LlapInputSplit<Text>();
-    jdbcSplit2.readFields(dataIn);
-
-    assertEquals(0, byteInStream.available());
-
-    checkLlapSplits(
-        (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(),
-        (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit());
-    assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass());
-  }
-
-  static void checkLlapSplits(
-      org.apache.hadoop.hive.llap.LlapInputSplit split1,
-      org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception {
-
-    assertEquals(split1.getSplitNum(), split2.getSplitNum());
-    assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes());
-    assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes());
-    SplitLocationInfo[] locationInfo1 = split1.getLocationInfo();
-    SplitLocationInfo[] locationInfo2 = split2.getLocationInfo();
-    for (int idx = 0; idx < locationInfo1.length; ++idx) {
-      assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation());
-      assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory());
-      assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk());
-    }
-    assertArrayEquals(split1.getLocations(), split2.getLocations());
-    assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
-    assertEquals(split1.getLlapUser(), split2.getLlapUser());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
new file mode 100644
index 0000000..04da17e
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hive.llap.ext;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.TypeDesc;
+
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.*;
+
+public class TestLlapInputSplit {
+
+  @Test
+  public void testWritable() throws Exception {
+    int splitNum = 88;
+    byte[] planBytes = "0123456789987654321".getBytes();
+    byte[] fragmentBytes = "abcdefghijklmnopqrstuvwxyz".getBytes();
+    SplitLocationInfo[] locations = {
+        new SplitLocationInfo("location1", false),
+        new SplitLocationInfo("location2", false),
+    };
+
+    ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+    colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING)));
+    colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
+    Schema schema = new Schema(colDescs);
+
+    org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
+        splitNum,
+        planBytes,
+        fragmentBytes,
+        locations,
+        schema,
+        "hive");
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
+    split1.write(dataOut);
+    ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+    DataInputStream dataIn = new DataInputStream(byteInStream);
+    org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit();
+    split2.readFields(dataIn);
+
+    // Did we read all the data?
+    assertEquals(0, byteInStream.available());
+
+    checkLlapSplits(split1, split2);
+
+    // Try JDBC LlapInputSplits
+    org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit1 =
+        new org.apache.hive.llap.ext.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat");
+    byteOutStream.reset();
+    jdbcSplit1.write(dataOut);
+    byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+    dataIn = new DataInputStream(byteInStream);
+    org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.llap.ext.LlapInputSplit<Text>();
+    jdbcSplit2.readFields(dataIn);
+
+    assertEquals(0, byteInStream.available());
+
+    checkLlapSplits(
+        (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(),
+        (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit());
+    assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass());
+  }
+
+  static void checkLlapSplits(
+      org.apache.hadoop.hive.llap.LlapInputSplit split1,
+      org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception {
+
+    assertEquals(split1.getSplitNum(), split2.getSplitNum());
+    assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes());
+    assertArrayEquals(split1.getFragmentBytes(), split2.getFragmentBytes());
+    SplitLocationInfo[] locationInfo1 = split1.getLocationInfo();
+    SplitLocationInfo[] locationInfo2 = split2.getLocationInfo();
+    for (int idx = 0; idx < locationInfo1.length; ++idx) {
+      assertEquals(locationInfo1[idx].getLocation(), locationInfo2[idx].getLocation());
+      assertEquals(locationInfo1[idx].isInMemory(), locationInfo2[idx].isInMemory());
+      assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk());
+    }
+    assertArrayEquals(split1.getLocations(), split2.getLocations());
+    assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
+    assertEquals(split1.getLlapUser(), split2.getLlapUser());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index deeac2e..5b4ba49 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -68,8 +68,8 @@ import org.apache.hadoop.io.Text;
 
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.apache.hive.jdbc.LlapBaseInputFormat;
-import org.apache.hive.jdbc.LlapRowInputFormat;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.llap.LlapRowInputFormat;
 
 import org.datanucleus.ClassLoaderResolver;
 import org.datanucleus.NucleusContext;

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
deleted file mode 100644
index a0ddeaa..0000000
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.jdbc;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import java.sql.SQLException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.sql.DriverManager;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
-
-import com.google.common.base.Preconditions;
-
-public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
-  private static String driverName = "org.apache.hive.jdbc.HiveDriver";
-  private String url;  // "jdbc:hive2://localhost:10000/default"
-  private String user; // "hive",
-  private String pwd;  // ""
-  private String query;
-
-  public static final String URL_KEY = "llap.if.hs2.connection";
-  public static final String QUERY_KEY = "llap.if.query";
-  public static final String USER_KEY = "llap.if.user";
-  public static final String PWD_KEY = "llap.if.pwd";
-
-  public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
-
-  private Connection con;
-  private Statement stmt;
-
-  public LlapBaseInputFormat(String url, String user, String pwd, String query) {
-    this.url = url;
-    this.user = user;
-    this.pwd = pwd;
-    this.query = query;
-  }
-
-  public LlapBaseInputFormat() {}
-
-
-  @Override
-  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    LlapInputSplit llapSplit = (LlapInputSplit) split;
-    return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
-  }
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    List<InputSplit> ins = new ArrayList<InputSplit>();
-
-    if (url == null) url = job.get(URL_KEY);
-    if (query == null) query = job.get(QUERY_KEY);
-    if (user == null) user = job.get(USER_KEY);
-    if (pwd == null) pwd = job.get(PWD_KEY);
-
-    if (url == null || query == null) {
-      throw new IllegalStateException();
-    }
-
-    try {
-      Class.forName(driverName);
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-
-    try {
-      con = DriverManager.getConnection(url,user,pwd);
-      stmt = con.createStatement();
-      String sql = String.format(SPLIT_QUERY, query, numSplits);
-      ResultSet res = stmt.executeQuery(sql);
-      while (res.next()) {
-        // deserialize split
-        DataInput in = new DataInputStream(res.getBinaryStream(3));
-        InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
-        is.readFields(in);
-        ins.add(new LlapInputSplit(is, res.getString(1)));
-      }
-
-      res.close();
-      stmt.close();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    return ins.toArray(new InputSplit[ins.size()]);
-  }
-
-  public void close() {
-    try {
-      con.close();
-    } catch (Exception e) {
-      // ignore
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
deleted file mode 100644
index 4c3c3ab..0000000
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.jdbc;
-
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
-import org.apache.hadoop.hive.llap.Schema;
-
-public class LlapDump {
-
-  private static final Logger LOG = LoggerFactory.getLogger(LlapDump.class);
-
-  private static String url = "jdbc:hive2://localhost:10000/default";
-  private static String user = "hive";
-  private static String pwd = "";
-  private static String query = "select * from test";
-  private static String numSplits = "1";
-
-  public static void main(String[] args) throws Exception {
-    Options opts = createOptions();
-    CommandLine cli = new GnuParser().parse(opts, args);
-
-    if (cli.hasOption('h')) {
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("orcfiledump", opts);
-      return;
-    }
-
-    if (cli.hasOption('l')) {
-      url = cli.getOptionValue("l");
-    }
-
-    if (cli.hasOption('u')) {
-      user = cli.getOptionValue("u");
-    }
-
-    if (cli.hasOption('p')) {
-      pwd = cli.getOptionValue("p");
-    }
-
-    if (cli.hasOption('n')) {
-      numSplits = cli.getOptionValue("n");
-    }
-
-    if (cli.getArgs().length > 0) {
-      query = cli.getArgs()[0];
-    }
-
-    System.out.println("url: "+url);
-    System.out.println("user: "+user);
-    System.out.println("query: "+query);
-
-    LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query);
-    JobConf job = new JobConf();
-
-    InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
-
-    if (splits.length == 0) {
-      System.out.println("No splits returned - empty scan");
-      System.out.println("Results: ");
-    } else {
-      boolean first = true;
-
-      for (InputSplit s: splits) {
-        LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
-        RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
-
-        if (reader instanceof LlapBaseRecordReader && first) {
-          Schema schema = ((LlapBaseRecordReader)reader).getSchema();
-          System.out.println(""+schema);
-        }
-
-        if (first) {
-          System.out.println("Results: ");
-          System.out.println("");
-          first = false;
-        }
-
-        Text value = reader.createValue();
-        while (reader.next(NullWritable.get(), value)) {
-          System.out.println(value);
-        }
-      }
-      System.exit(0);
-    }
-  }
-
-  static Options createOptions() {
-    Options result = new Options();
-
-    result.addOption(OptionBuilder
-        .withLongOpt("location")
-        .withDescription("HS2 url")
-        .hasArg()
-        .create('l'));
-
-    result.addOption(OptionBuilder
-        .withLongOpt("user")
-        .withDescription("user name")
-        .hasArg()
-        .create('u'));
-
-    result.addOption(OptionBuilder
-        .withLongOpt("pwd")
-        .withDescription("password")
-        .hasArg()
-        .create('p'));
-
-    result.addOption(OptionBuilder
-        .withLongOpt("num")
-        .withDescription("number of splits")
-        .hasArg()
-        .create('n'));
-
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java
deleted file mode 100644
index 0f4fd4e..0000000
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputSplit.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.hive.jdbc;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-
-
-public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo {
-  InputSplitWithLocationInfo nativeSplit;
-  String inputFormatClassName;
-
-  public LlapInputSplit() {
-  }
-
-  public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) {
-    this.nativeSplit = nativeSplit;
-    this.inputFormatClassName = inputFormatClassName;
-  }
-
-  @Override
-  public long getLength() throws IOException {
-    return nativeSplit.getLength();
-  }
-
-  @Override
-  public String[] getLocations() throws IOException {
-    return nativeSplit.getLocations();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeUTF(inputFormatClassName);
-    out.writeUTF(nativeSplit.getClass().getName());
-    nativeSplit.write(out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    inputFormatClassName = in.readUTF();
-    String splitClass = in.readUTF();
-    try {
-      nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    nativeSplit.readFields(in);
-  }
-
-  @Override
-  public SplitLocationInfo[] getLocationInfo() throws IOException {
-    return nativeSplit.getLocationInfo();
-  }
-
-  public InputSplit getSplit() {
-    return nativeSplit;
-  }
-
-  public InputFormat<NullWritable, V> getInputFormat() throws IOException {
-    try {
-      return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName)
-          .newInstance();
-    } catch(Exception e) {
-      throw new IOException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
deleted file mode 100644
index 1cca66a..0000000
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.hive.jdbc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
-import org.apache.hadoop.hive.llap.LlapRowRecordReader;
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.Schema;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
-  LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    return baseInputFormat.getSplits(job, numSplits);
-  }
-
-  @Override
-  public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
-      throws IOException {
-    LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split;
-    LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter);
-    return new LlapRowRecordReader(job, reader.getSchema(), reader);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 50c06a4..4a75bbb 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -109,6 +109,38 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <build>
     <sourceDirectory>${basedir}/src/java</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
new file mode 100644
index 0000000..7073280
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
+
+  DataInputStream din;
+  Schema schema;
+  Class<V> clazz;
+
+
+  protected Thread readerThread = null;
+  protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+  public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) {
+    din = new DataInputStream(in);
+    this.schema = schema;
+    this.clazz = clazz;
+    this.readerThread = Thread.currentThread();
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public void close() throws IOException {
+    din.close();
+  }
+
+  @Override
+  public long getPos() { return 0; }
+
+  @Override
+  public float getProgress() { return 0f; }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V createValue() {
+    try {
+      return clazz.newInstance();
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean next(NullWritable key, V value) throws IOException {
+    try {
+      // Need a way to know what thread to interrupt, since this is a blocking thread.
+      setReaderThread(Thread.currentThread());
+
+      value.readFields(din);
+      return true;
+    } catch (EOFException eof) {
+      // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
+      ReaderEvent event = getReaderEvent();
+      switch (event.getEventType()) {
+        case DONE:
+          break;
+        default:
+          throw new IOException("Expected reader event with done status, but got "
+              + event.getEventType() + " with message " + event.getMessage());
+      }
+      return false;
+    } catch (IOException io) {
+      if (Thread.interrupted()) {
+        // Either we were interrupted by one of:
+        // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
+        // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+        // Either way we should not try to block trying to read the reader events queue.
+        if (readerEvents.isEmpty()) {
+          // Case 2.
+          throw io;
+        } else {
+          // Case 1. Fail the reader, sending back the error we received from the reader event.
+          ReaderEvent event = getReaderEvent();
+          switch (event.getEventType()) {
+            case ERROR:
+              throw new IOException("Received reader event error: " + event.getMessage());
+            default:
+              throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
+          }
+        }
+      } else {
+        // If we weren't interrupted, just propagate the error
+        throw io;
+      }
+    }
+  }
+
+  /**
+   * Define success/error events which are passed to the reader from a different thread.
+   * The reader will check for these events on end of input and interruption of the reader thread.
+   */
+  public static class ReaderEvent {
+    public enum EventType {
+      DONE,
+      ERROR
+    }
+
+    protected final EventType eventType;
+    protected final String message;
+
+    protected ReaderEvent(EventType type, String message) {
+      this.eventType = type;
+      this.message = message;
+    }
+
+    public static ReaderEvent doneEvent() {
+      return new ReaderEvent(EventType.DONE, "");
+    }
+
+    public static ReaderEvent errorEvent(String message) {
+      return new ReaderEvent(EventType.ERROR, message);
+    }
+
+    public EventType getEventType() {
+      return eventType;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+  }
+
+  public void handleEvent(ReaderEvent event) {
+    switch (event.getEventType()) {
+      case DONE:
+        // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
+        readerEvents.add(event);
+        break;
+      case ERROR:
+        readerEvents.add(event);
+        if (readerThread == null) {
+          throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
+        }
+        // Reader is using a blocking socket .. interrupt it.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
+        }
+        getReaderThread().interrupt();
+        break;
+      default:
+        throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
+    }
+  }
+
+  protected ReaderEvent getReaderEvent() {
+    try {
+      ReaderEvent event = readerEvents.take();
+      return event;
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
+    }
+  }
+
+  protected synchronized void setReaderThread(Thread readerThread) {
+    this.readerThread = readerThread;
+  }
+
+  protected synchronized Thread getReaderThread() {
+    return readerThread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
new file mode 100644
index 0000000..0930d60
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+
+public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
+
+  public LlapInputFormat() {
+  }
+
+  /*
+   * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
+   * off the work in the split to LLAP and finally return the connected socket back in an
+   * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
+   */
+  @Override
+  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+
+    LlapInputSplit llapSplit = (LlapInputSplit) split;
+
+    // Set conf to use LLAP user rather than current user for LLAP Zk registry.
+    HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
+    SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
+
+    ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
+    String host = serviceInstance.getHost();
+    int llapSubmitPort = serviceInstance.getRpcPort();
+
+    LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
+        + " and outputformat port " + serviceInstance.getOutputFormatPort());
+
+    LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
+        new LlapRecordReaderTaskUmbilicalExternalResponder();
+    LlapTaskUmbilicalExternalClient llapClient =
+      new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+          submitWorkInfo.getToken(), umbilicalResponder);
+    llapClient.init(job);
+    llapClient.start();
+
+    SubmitWorkRequestProto submitWorkRequestProto =
+      constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+          llapClient.getAddress(), submitWorkInfo.getToken());
+
+    TezEvent tezEvent = new TezEvent();
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
+    tezEvent.readFields(dib);
+    List<TezEvent> tezEventList = Lists.newArrayList();
+    tezEventList.add(tezEvent);
+
+    llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
+
+    String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
+
+    HiveConf conf = new HiveConf();
+    Socket socket = new Socket(host,
+        serviceInstance.getOutputFormatPort());
+
+    LOG.debug("Socket connected");
+
+    socket.getOutputStream().write(id.getBytes());
+    socket.getOutputStream().write(0);
+    socket.getOutputStream().flush();
+
+    LOG.info("Registered id: " + id);
+
+    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+    umbilicalResponder.setRecordReader(recordReader);
+    return recordReader;
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    throw new IOException("These are not the splits you are looking for.");
+  }
+
+  private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
+    LlapRegistryService registryService = LlapRegistryService.getClient(job);
+    String host = llapSplit.getLocations()[0];
+
+    ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
+    if (serviceInstance == null) {
+      throw new IOException("No service instances found for " + host + " in registry");
+    }
+
+    return serviceInstance;
+  }
+
+  private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
+    InetAddress address = InetAddress.getByName(host);
+    ServiceInstanceSet instanceSet = registryService.getInstances();
+    ServiceInstance serviceInstance = null;
+
+    // The name used in the service registry may not match the host name we're using.
+    // Try hostname/canonical hostname/host address
+
+    String name = address.getHostName();
+    LOG.info("Searching service instance by hostname " + name);
+    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+    if (serviceInstance != null) {
+      return serviceInstance;
+    }
+
+    name = address.getCanonicalHostName();
+    LOG.info("Searching service instance by canonical hostname " + name);
+    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+    if (serviceInstance != null) {
+      return serviceInstance;
+    }
+
+    name = address.getHostAddress();
+    LOG.info("Searching service instance by address " + name);
+    serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+    if (serviceInstance != null) {
+      return serviceInstance;
+    }
+
+    return serviceInstance;
+  }
+
+  private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
+    if (serviceInstances == null || serviceInstances.isEmpty()) {
+      return null;
+    }
+
+    // Get the first live service instance
+    for (ServiceInstance serviceInstance : serviceInstances) {
+      if (serviceInstance.isAlive()) {
+        return serviceInstance;
+      }
+    }
+
+    LOG.info("No live service instances were found");
+    return null;
+  }
+
+  private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
+      int taskNum,
+      InetSocketAddress address,
+      Token<JobTokenIdentifier> token) throws
+        IOException {
+    TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
+    ApplicationId appId = submitWorkInfo.getFakeAppId();
+
+    SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
+    // This works, assuming the executor is running within YARN.
+    LOG.info("Setting user in submitWorkRequest to: " +
+        System.getenv(ApplicationConstants.Environment.USER.name()));
+    builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+    builder.setApplicationIdString(appId.toString());
+    builder.setAppAttemptNumber(0);
+    builder.setTokenIdentifier(appId.toString());
+
+    ContainerId containerId =
+      ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
+    builder.setContainerIdString(containerId.toString());
+
+    builder.setAmHost(address.getHostName());
+    builder.setAmPort(address.getPort());
+    Credentials taskCredentials = new Credentials();
+    // Credentials can change across DAGs. Ideally construct only once per DAG.
+    // TODO Figure out where credentials will come from. Normally Hive sets up
+    // URLs on the tez dag, for which Tez acquires credentials.
+
+    //    taskCredentials.addAll(getContext().getCredentials());
+
+    //    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+    //        taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+    //    ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+    //    if (credentialsBinary == null) {
+    //      credentialsBinary = serializeCredentials(getContext().getCredentials());
+    //      credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+    //    } else {
+    //      credentialsBinary = credentialsBinary.duplicate();
+    //    }
+    //    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    Credentials credentials = new Credentials();
+    TokenCache.setSessionToken(token, credentials);
+    ByteBuffer credentialsBinary = serializeCredentials(credentials);
+    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+
+
+    builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+
+    FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
+    runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
+    runtimeInfo.setWithinDagPriority(0);
+    runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
+    runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
+    runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
+    runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
+
+
+    builder.setUsingTezAm(false);
+    builder.setFragmentRuntimeInfo(runtimeInfo.build());
+    return builder.build();
+  }
+
+  private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+    Credentials containerCredentials = new Credentials();
+    containerCredentials.addAll(credentials);
+    DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+    containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+    return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
+  }
+
+  private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
+    protected LlapBaseRecordReader recordReader = null;
+    protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+    public LlapRecordReaderTaskUmbilicalExternalResponder() {
+    }
+
+    @Override
+    public void submissionFailed(String fragmentId, Throwable throwable) {
+      try {
+        sendOrQueueEvent(ReaderEvent.errorEvent(
+            "Received submission failed event for fragment ID " + fragmentId));
+      } catch (Exception err) {
+        LOG.error("Error during heartbeat responder:", err);
+      }
+    }
+
+    @Override
+    public void heartbeat(TezHeartbeatRequest request) {
+      TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+      List<TezEvent> inEvents = request.getEvents();
+      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+        EventType eventType = tezEvent.getEventType();
+        try {
+          switch (eventType) {
+            case TASK_ATTEMPT_COMPLETED_EVENT:
+              sendOrQueueEvent(ReaderEvent.doneEvent());
+              break;
+            case TASK_ATTEMPT_FAILED_EVENT:
+              TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
+              sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+              break;
+            case TASK_STATUS_UPDATE_EVENT:
+              // If we want to handle counters
+              break;
+            default:
+              LOG.warn("Unhandled event type " + eventType);
+              break;
+          }
+        } catch (Exception err) {
+          LOG.error("Error during heartbeat responder:", err);
+        }
+      }
+    }
+
+    @Override
+    public void taskKilled(TezTaskAttemptID taskAttemptId) {
+      try {
+        sendOrQueueEvent(ReaderEvent.errorEvent(
+            "Received task killed event for task ID " + taskAttemptId));
+      } catch (Exception err) {
+        LOG.error("Error during heartbeat responder:", err);
+      }
+    }
+
+    @Override
+    public void heartbeatTimeout(String taskAttemptId) {
+      try {
+        sendOrQueueEvent(ReaderEvent.errorEvent(
+            "Timed out waiting for heartbeat for task ID " + taskAttemptId));
+      } catch (Exception err) {
+        LOG.error("Error during heartbeat responder:", err);
+      }
+    }
+
+    public synchronized LlapBaseRecordReader getRecordReader() {
+      return recordReader;
+    }
+
+    public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
+      this.recordReader = recordReader;
+
+      if (recordReader == null) {
+        return;
+      }
+
+      // If any events were queued by the responder, give them to the record reader now.
+      while (!queuedEvents.isEmpty()) {
+        ReaderEvent readerEvent = queuedEvents.poll();
+        LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
+        recordReader.handleEvent(readerEvent);
+      }
+    }
+
+    /**
+     * Send the ReaderEvents to the record reader, if it is registered to this responder.
+     * If there is no registered record reader, add them to a list of pending reader events
+     * since we don't want to drop these events.
+     * @param readerEvent
+     */
+    protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
+      LlapBaseRecordReader recordReader = getRecordReader();
+      if (recordReader != null) {
+        recordReader.handleEvent(readerEvent);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
+              + " with message " + readerEvent.getMessage());
+        }
+
+        try {
+          queuedEvents.put(readerEvent);
+        } catch (Exception err) {
+          throw new RuntimeException("Unexpected exception while queueing reader event", err);
+        }
+      }
+    }
+
+    /**
+     * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
+     */
+    public void clearQueuedEvents() {
+      queuedEvents.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
new file mode 100644
index 0000000..02aedfd
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+
+public class LlapInputSplit implements InputSplitWithLocationInfo {
+
+  int splitNum;
+  byte[] planBytes;
+  byte[] fragmentBytes;
+  SplitLocationInfo[] locations;
+  Schema schema;
+  String llapUser;
+
+  public LlapInputSplit() {
+  }
+
+  public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema, String llapUser) {
+    this.planBytes = planBytes;
+    this.fragmentBytes = fragmentBytes;
+    this.locations = locations;
+    this.schema = schema;
+    this.splitNum = splitNum;
+    this.llapUser = llapUser;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    String[] locs = new String[locations.length];
+    for (int i = 0; i < locations.length; ++i) {
+      locs[i] = locations[i].getLocation();
+    }
+    return locs;
+  }
+
+  public int getSplitNum() {
+    return splitNum;
+  }
+
+  public byte[] getPlanBytes() {
+    return planBytes;
+  }
+
+  public byte[] getFragmentBytes() {
+    return fragmentBytes;
+  }
+
+
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(splitNum);
+    out.writeInt(planBytes.length);
+    out.write(planBytes);
+
+    out.writeInt(fragmentBytes.length);
+    out.write(fragmentBytes);
+
+    out.writeInt(locations.length);
+    for (int i = 0; i < locations.length; ++i) {
+      out.writeUTF(locations[i].getLocation());
+    }
+
+    schema.write(out);
+    out.writeUTF(llapUser);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    splitNum = in.readInt();
+    int length = in.readInt();
+    planBytes = new byte[length];
+    in.readFully(planBytes);
+
+    length = in.readInt();
+    fragmentBytes = new byte[length];
+    in.readFully(fragmentBytes);
+
+    length = in.readInt();
+    locations = new SplitLocationInfo[length];
+
+    for (int i = 0; i < length; ++i) {
+      locations[i] = new SplitLocationInfo(in.readUTF(), false);
+    }
+
+    schema = new Schema();
+    schema.readFields(in);
+    llapUser = in.readUTF();
+  }
+
+  @Override
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return locations;
+  }
+
+  public String getLlapUser() {
+    return llapUser;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
new file mode 100644
index 0000000..4e000ff
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -0,0 +1,155 @@
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.TypeDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
+
+  Configuration conf;
+  RecordReader<NullWritable, Text> reader;
+  Schema schema;
+  SerDe serde;
+  final Text textData = new Text();
+
+  public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) {
+    this.conf = conf;
+    this.schema = schema;
+    this.reader = reader;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public Row createValue() {
+    return new Row(schema);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public boolean next(NullWritable key, Row value) throws IOException {
+    Preconditions.checkArgument(value != null);
+
+    if (serde == null) {
+      try {
+        serde = initSerDe(conf);
+      } catch (SerDeException err) {
+        throw new IOException(err);
+      }
+    }
+
+    boolean hasNext = reader.next(key,  textData);
+    if (hasNext) {
+      // Deserialize Text to column values, and populate the row record
+      Object rowObj;
+      try {
+        StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
+        rowObj = serde.deserialize(textData);
+        List<? extends StructField> colFields = rowOI.getAllStructFieldRefs();
+        for (int idx = 0; idx < colFields.size(); ++idx) {
+          StructField field = colFields.get(idx);
+          Object colValue = rowOI.getStructFieldData(rowObj, field);
+          Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE,
+              "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName());
+
+          PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector();
+          // char/varchar special cased here since the row record handles them using Text
+          switch (poi.getPrimitiveCategory()) {
+            case CHAR:
+              value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue());
+              break;
+            case VARCHAR:
+              value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue());
+              break;
+            default:
+              value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue));
+              break;
+          }
+        }
+      } catch (SerDeException err) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Error deserializing row from text: " + textData);
+        }
+        throw new IOException("Error deserializing row data", err);
+      }
+    }
+
+    return hasNext;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  protected SerDe initSerDe(Configuration conf) throws SerDeException {
+    Properties props = new Properties();
+    StringBuffer columnsBuffer = new StringBuffer();
+    StringBuffer typesBuffer = new StringBuffer();
+    boolean isFirst = true;
+    for (FieldDesc colDesc : schema.getColumns()) {
+      if (!isFirst) {
+        columnsBuffer.append(',');
+        typesBuffer.append(',');
+      }
+      columnsBuffer.append(colDesc.getName());
+      typesBuffer.append(colDesc.getTypeDesc().toString());
+      isFirst = false;
+    }
+    String columns = columnsBuffer.toString();
+    String types = typesBuffer.toString();
+    props.put(serdeConstants.LIST_COLUMNS, columns);
+    props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+    SerDe serde = new LazySimpleSerDe();
+    serde.initialize(conf, props);
+
+    return serde;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
new file mode 100644
index 0000000..83149ab
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -0,0 +1,103 @@
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class SubmitWorkInfo implements Writable {
+
+  private TaskSpec taskSpec;
+  private ApplicationId fakeAppId;
+  private long creationTime;
+
+  // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to
+  // talk to LLAP daemons itself via the securit work.
+  private Token<JobTokenIdentifier> token;
+
+  public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) {
+    this.taskSpec = taskSpec;
+    this.fakeAppId = fakeAppId;
+    this.token = createJobToken();
+    this.creationTime = creationTime;
+  }
+
+  // Empty constructor for writable etc.
+  public SubmitWorkInfo() {
+  }
+
+  public TaskSpec getTaskSpec() {
+    return taskSpec;
+  }
+
+  public ApplicationId getFakeAppId() {
+    return fakeAppId;
+  }
+
+  public String getTokenIdentifier() {
+    return fakeAppId.toString();
+  }
+
+  public Token<JobTokenIdentifier> getToken() {
+    return token;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskSpec.write(out);
+    out.writeLong(fakeAppId.getClusterTimestamp());
+    out.writeInt(fakeAppId.getId());
+    token.write(out);
+    out.writeLong(creationTime);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskSpec = new TaskSpec();
+    taskSpec.readFields(in);
+    long appIdTs = in.readLong();
+    int appIdId = in.readInt();
+    fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
+    token = new Token<>();
+    token.readFields(in);
+    creationTime = in.readLong();
+  }
+
+  public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    submitWorkInfo.write(dob);
+    return dob.getData();
+  }
+
+  public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
+    SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
+    submitWorkInfo.readFields(dib);
+    return submitWorkInfo;
+  }
+
+
+  private Token<JobTokenIdentifier> createJobToken() {
+    String tokenIdentifier = fakeAppId.toString();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+        tokenIdentifier));
+    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+        new JobTokenSecretManager());
+    sessionToken.setService(identifier.getJobId());
+    return sessionToken;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
new file mode 100644
index 0000000..7d06637
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -0,0 +1,415 @@
+package org.apache.hadoop.hive.llap.ext;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LlapTaskUmbilicalExternalClient extends AbstractService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
+
+  private final LlapProtocolClientProxy communicator;
+  private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
+  private final Configuration conf;
+  private final LlapTaskUmbilicalProtocol umbilical;
+
+  protected final String tokenIdentifier;
+  protected final Token<JobTokenIdentifier> sessionToken;
+
+  private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks= new ConcurrentHashMap<String, TaskHeartbeatInfo>();
+  private LlapTaskUmbilicalExternalResponder responder = null;
+  private final ScheduledThreadPoolExecutor timer;
+  private final long connectionTimeout;
+
+  private static class TaskHeartbeatInfo {
+    final String taskAttemptId;
+    final String hostname;
+    final int port;
+    final AtomicLong lastHeartbeat = new AtomicLong();
+
+    public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
+      this.taskAttemptId = taskAttemptId;
+      this.hostname = hostname;
+      this.port = port;
+      this.lastHeartbeat.set(System.currentTimeMillis());
+    }
+  }
+
+  private static class PendingEventData {
+    final TaskHeartbeatInfo heartbeatInfo;
+    final List<TezEvent> tezEvents;
+
+    public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
+      this.heartbeatInfo = heartbeatInfo;
+      this.tezEvents = tezEvents;
+    }
+  }
+
+  // TODO KKK Work out the details of the tokenIdentifier, and the session token.
+  // It may just be possible to create one here - since Shuffle is not involved, and this is only used
+  // for communication from LLAP-Daemons to the server. It will need to be sent in as part
+  // of the job submission request.
+  public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier,
+      Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder) {
+    super(LlapTaskUmbilicalExternalClient.class.getName());
+    this.conf = conf;
+    this.umbilical = new LlapTaskUmbilicalExternalImpl();
+    this.tokenIdentifier = tokenIdentifier;
+    this.sessionToken = sessionToken;
+    this.responder = responder;
+    this.timer = new ScheduledThreadPoolExecutor(1);
+    this.connectionTimeout = HiveConf.getTimeVar(conf,
+        HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
+    this.communicator = new LlapProtocolClientProxy(1, conf, null);
+    this.communicator.init(conf);
+  }
+
+  @Override
+  public void serviceStart() throws IOException {
+    int numHandlers = HiveConf.getIntVar(conf,
+        HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
+    llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
+    communicator.start();
+  }
+
+  @Override
+  public void serviceStop() {
+    llapTaskUmbilicalServer.shutdownServer();
+    timer.shutdown();
+    if (this.communicator != null) {
+      this.communicator.stop();
+    }
+  }
+
+  public InetSocketAddress getAddress() {
+    return llapTaskUmbilicalServer.getAddress();
+  }
+
+
+  /**
+   * Submit the work for actual execution. This should always have the usingTezAm flag disabled
+   * @param submitWorkRequestProto
+   */
+  public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
+    Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
+
+    // Register the pending events to be sent for this spec.
+    String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
+    PendingEventData pendingEventData = new PendingEventData(
+        new TaskHeartbeatInfo(fragmentId, llapHost, llapPort),
+        tezEvents);
+    pendingEvents.putIfAbsent(fragmentId, pendingEventData);
+
+    // Setup timer task to check for hearbeat timeouts
+    timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
+        connectionTimeout, connectionTimeout, TimeUnit.MILLISECONDS);
+
+    // Send out the actual SubmitWorkRequest
+    communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
+        new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
+
+          @Override
+          public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
+            if (response.hasSubmissionState()) {
+              if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
+                String msg = "Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.";
+                LOG.info(msg);
+                if (responder != null) {
+                  Throwable err = new RuntimeException(msg);
+                  responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
+                }
+                return;
+              }
+            }
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            String msg = "Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString();
+            LOG.error(msg, t);
+            Throwable err = new RuntimeException(msg, t);
+            responder.submissionFailed(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), err);
+          }
+        });
+
+
+
+
+
+//    // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
+//    // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
+//    QueryIdentifierProto queryIdentifier = QueryIdentifierProto
+//        .newBuilder()
+//        .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
+//        .build();
+//    LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
+//        LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
+//            LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
+//            setSrcName(TODO)
+//    communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
+
+
+  }
+
+  private void updateHeartbeatInfo(String taskAttemptId) {
+    int updateCount = 0;
+
+    PendingEventData pendingEventData = pendingEvents.get(taskAttemptId);
+    if (pendingEventData != null) {
+      pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+      updateCount++;
+    }
+
+    TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(taskAttemptId);
+    if (heartbeatInfo != null) {
+      heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+      updateCount++;
+    }
+
+    if (updateCount == 0) {
+      LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
+    }
+  }
+
+  private void updateHeartbeatInfo(String hostname, int port) {
+    int updateCount = 0;
+
+    for (String key : pendingEvents.keySet()) {
+      PendingEventData pendingEventData = pendingEvents.get(key);
+      if (pendingEventData != null) {
+        if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
+            && pendingEventData.heartbeatInfo.port == port) {
+          pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+          updateCount++;
+        }
+      }
+    }
+
+    for (String key : registeredTasks.keySet()) {
+      TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+      if (heartbeatInfo != null) {
+        if (heartbeatInfo.hostname.equals(hostname)
+            && heartbeatInfo.port == port) {
+          heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+          updateCount++;
+        }
+      }
+    }
+
+    if (updateCount == 0) {
+      LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
+    }
+  }
+
+  private class HeartbeatCheckTask implements Runnable {
+    public void run() {
+      long currentTime = System.currentTimeMillis();
+      List<String> timedOutTasks = new ArrayList<String>();
+
+      // Check both pending and registered tasks for timeouts
+      for (String key : pendingEvents.keySet()) {
+        PendingEventData pendingEventData = pendingEvents.get(key);
+        if (pendingEventData != null) {
+          if (currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+            timedOutTasks.add(key);
+          }
+        }
+      }
+      for (String timedOutTask : timedOutTasks) {
+        LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
+        responder.heartbeatTimeout(timedOutTask);
+        pendingEvents.remove(timedOutTask);
+        // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+      }
+
+      timedOutTasks.clear();
+      for (String key : registeredTasks.keySet()) {
+        TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
+        if (heartbeatInfo != null) {
+          if (currentTime - heartbeatInfo.lastHeartbeat.get() >= connectionTimeout) {
+            timedOutTasks.add(key);
+          }
+        }
+      }
+      for (String timedOutTask : timedOutTasks) {
+        LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
+        responder.heartbeatTimeout(timedOutTask);
+        registeredTasks.remove(timedOutTask);
+        // TODO: Do we need to tell the LLAP daemon we are no longer interested in this task?
+      }
+    }
+  }
+
+  public interface LlapTaskUmbilicalExternalResponder {
+    void submissionFailed(String fragmentId, Throwable throwable);
+    void heartbeat(TezHeartbeatRequest request);
+    void taskKilled(TezTaskAttemptID taskAttemptId);
+    void heartbeatTimeout(String fragmentId);
+  }
+
+
+
+  // TODO Ideally, the server should be shared across all client sessions running on the same node.
+  private class LlapTaskUmbilicalExternalImpl implements  LlapTaskUmbilicalProtocol {
+
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      // Expecting only a single instance of a task to be running.
+      return true;
+    }
+
+    @Override
+    public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+        TezException {
+      // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
+      // Some parts of fault tolerance go here.
+
+      // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received heartbeat from container, request=" + request);
+      }
+
+      // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
+      TezHeartbeatResponse response = new TezHeartbeatResponse();
+
+      response.setLastRequestId(request.getRequestId());
+      // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
+      TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+      String taskAttemptIdString = taskAttemptId.toString();
+
+      updateHeartbeatInfo(taskAttemptIdString);
+
+      List<TezEvent> tezEvents = null;
+      PendingEventData pendingEventData = pendingEvents.remove(taskAttemptIdString);
+      if (pendingEventData == null) {
+        tezEvents = Collections.emptyList();
+
+        // If this heartbeat was not from a pending event and it's not in our list of registered tasks,
+        if (!registeredTasks.containsKey(taskAttemptIdString)) {
+          LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
+          response.setShouldDie(); // Do any of the other fields need to be set?
+          return response;
+        }
+      } else {
+        tezEvents = pendingEventData.tezEvents;
+        // Tasks removed from the pending list should then be added to the registered list.
+        registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
+      }
+
+      response.setLastRequestId(request.getRequestId());
+      // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
+      // Also since we have all the MRInput events here - they'll all be sent in together.
+      response.setNextFromEventId(0); // Irrelevant. See comment above.
+      response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
+      response.setEvents(tezEvents);
+
+      List<TezEvent> inEvents = request.getEvents();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Heartbeat from " + taskAttemptIdString +
+            " events: " + (inEvents != null ? inEvents.size() : -1));
+      }
+      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+        EventType eventType = tezEvent.getEventType();
+        switch (eventType) {
+          case TASK_ATTEMPT_COMPLETED_EVENT:
+            LOG.debug("Task completed event for " + taskAttemptIdString);
+            registeredTasks.remove(taskAttemptIdString);
+            break;
+          case TASK_ATTEMPT_FAILED_EVENT:
+            LOG.debug("Task failed event for " + taskAttemptIdString);
+            registeredTasks.remove(taskAttemptIdString);
+            break;
+          case TASK_STATUS_UPDATE_EVENT:
+            // If we want to handle counters
+            LOG.debug("Task update event for " + taskAttemptIdString);
+            break;
+          default:
+            LOG.warn("Unhandled event type " + eventType);
+            break;
+        }
+      }
+
+      // Pass the request on to the responder
+      try {
+        if (responder != null) {
+          responder.heartbeat(request);
+        }
+      } catch (Exception err) {
+        LOG.error("Error during responder execution", err);
+      }
+
+      return response;
+    }
+
+    @Override
+    public void nodeHeartbeat(Text hostname, int port) throws IOException {
+      updateHeartbeatInfo(hostname.toString(), port);
+      // No need to propagate to this to the responder
+    }
+
+    @Override
+    public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
+      String taskAttemptIdString = taskAttemptId.toString();
+      LOG.error("Task killed - " + taskAttemptIdString);
+      registeredTasks.remove(taskAttemptIdString);
+
+      try {
+        if (responder != null) {
+          responder.taskKilled(taskAttemptId);
+        }
+      } catch (Exception err) {
+        LOG.error("Error during responder execution", err);
+      }
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+                                                  int clientMethodsHash) throws IOException {
+      return ProtocolSignature.getProtocolSignature(this, protocol,
+          clientVersion, clientMethodsHash);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7b9096a9/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
new file mode 100644
index 0000000..dbd591a
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskUmbilicalServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class);
+
+  protected volatile Server server;
+  private final InetSocketAddress address;
+  private final AtomicBoolean started = new AtomicBoolean(true);
+
+  public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
+      IOException {
+    JobTokenSecretManager jobTokenSecretManager =
+        new JobTokenSecretManager();
+    jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+
+    server = new RPC.Builder(conf)
+        .setProtocol(LlapTaskUmbilicalProtocol.class)
+        .setBindAddress("0.0.0.0")
+        .setPort(0)
+        .setInstance(umbilical)
+        .setNumHandlers(numHandlers)
+        .setSecretManager(jobTokenSecretManager).build();
+
+    server.start();
+    this.address = NetUtils.getConnectAddress(server);
+    LOG.info(
+        "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
+            " with numHandlers=" + numHandlers);
+  }
+
+  public InetSocketAddress getAddress() {
+    return this.address;
+  }
+
+  public void shutdownServer() {
+    if (started.get()) { // Primarily to avoid multiple shutdowns.
+      started.set(false);
+      server.stop();
+    }
+  }
+}