You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:25:07 UTC

[17/39] hive git commit: HIVE-13519: Allow LlapRecordReader to parse/output rows

HIVE-13519: Allow LlapRecordReader to parse/output rows


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc7343dd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc7343dd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc7343dd

Branch: refs/heads/master
Commit: fc7343dd12ac152267615e6ac67238ee06326452
Parents: 8f6b28a
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Apr 14 14:30:45 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Apr 14 14:30:45 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/jdbc/TestLlapInputSplit.java    |  21 +-
 .../apache/hive/jdbc/TestJdbcWithMiniLlap.java  |  38 ++--
 jdbc/pom.xml                                    |   5 +
 .../apache/hive/jdbc/LlapBaseInputFormat.java   | 135 ++++++++++++
 .../src/java/org/apache/hive/jdbc/LlapDump.java |  11 +-
 .../org/apache/hive/jdbc/LlapInputFormat.java   | 135 ------------
 .../apache/hive/jdbc/LlapRowInputFormat.java    |  34 +++
 llap-common/pom.xml                             |  21 ++
 .../org/apache/hadoop/hive/llap/FieldDesc.java  |  63 ++++++
 .../hadoop/hive/llap/LlapRowRecordReader.java   | 155 ++++++++++++++
 .../java/org/apache/hadoop/hive/llap/Row.java   | 166 +++++++++++++++
 .../org/apache/hadoop/hive/llap/Schema.java     |  76 +++++++
 .../org/apache/hadoop/hive/llap/TypeDesc.java   | 108 ++++++++++
 .../org/apache/hadoop/hive/llap/TestRow.java    |  92 +++++++++
 .../hadoop/hive/llap/LlapInputFormat.java       |  27 ++-
 .../hadoop/hive/llap/LlapBaseRecordReader.java  | 205 ++++++++++++++++++
 .../apache/hadoop/hive/llap/LlapInputSplit.java |  27 +--
 .../hadoop/hive/llap/LlapRecordReader.java      | 206 -------------------
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  87 +++++++-
 .../hadoop/hive/llap/TestLlapOutputFormat.java  |   6 +-
 20 files changed, 1205 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
index 338930e..366e326 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/jdbc/TestLlapInputSplit.java
@@ -10,8 +10,10 @@ import java.util.HashMap;
 
 import org.apache.hadoop.io.Text;
 
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.TypeDesc;
+
 import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.junit.After;
 import org.junit.Before;
@@ -32,14 +34,11 @@ public class TestLlapInputSplit {
         new SplitLocationInfo("location1", false),
         new SplitLocationInfo("location2", false),
     };
-    ArrayList<FieldSchema> fields = new ArrayList<FieldSchema>();
-    fields.add(new FieldSchema("col1", "string", "comment1"));
-    fields.add(new FieldSchema("col2", "int", "comment2"));
-    HashMap<String, String> properties = new HashMap<String, String>();
-    properties.put("key1", "val1");
-    Schema schema = new Schema(
-        fields,
-        properties);
+
+    ArrayList<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+    colDescs.add(new FieldDesc("col1", new TypeDesc(TypeDesc.Type.STRING)));
+    colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
+    Schema schema = new Schema(colDescs);
 
     org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
         splitNum,
@@ -94,7 +93,7 @@ public class TestLlapInputSplit {
       assertEquals(locationInfo1[idx].isOnDisk(), locationInfo2[idx].isOnDisk());
     }
     assertArrayEquals(split1.getLocations(), split2.getLocations());
-    assertEquals(split1.getSchema(), split2.getSchema());
+    assertEquals(split1.getSchema().toString(), split2.getSchema().toString());
     assertEquals(split1.getLlapUser(), split2.getLlapUser());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index 98daab4..deeac2e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -60,16 +60,16 @@ import org.apache.hadoop.mapred.RecordReader;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapRecordReader;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.apache.hive.jdbc.LlapInputFormat;
+import org.apache.hive.jdbc.LlapBaseInputFormat;
+import org.apache.hive.jdbc.LlapRowInputFormat;
 
 import org.datanucleus.ClassLoaderResolver;
 import org.datanucleus.NucleusContext;
@@ -109,8 +109,6 @@ public class TestJdbcWithMiniLlap {
 
     conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
         + "/tez-site.xml"));
-    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
-        + "/llap-daemon-site.xml"));
 
     miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
 
@@ -202,10 +200,14 @@ public class TestJdbcWithMiniLlap {
     String user = System.getProperty("user.name");
     String pwd = user;
 
-    LlapInputFormat inputFormat = new LlapInputFormat(url, user, pwd, query);
+    LlapRowInputFormat inputFormat = new LlapRowInputFormat();
 
     // Get splits
     JobConf job = new JobConf(conf);
+    job.set(LlapBaseInputFormat.URL_KEY, url);
+    job.set(LlapBaseInputFormat.USER_KEY, user);
+    job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+    job.set(LlapBaseInputFormat.QUERY_KEY, query);
 
     InputSplit[] splits = inputFormat.getSplits(job, numSplits);
     assertTrue(splits.length > 0);
@@ -216,10 +218,12 @@ public class TestJdbcWithMiniLlap {
     for (InputSplit split : splits) {
       System.out.println("Processing split " + split.getLocations());
 
-      RecordReader<NullWritable, Text> reader = inputFormat.getRecordReader(split, job, null);
-      if (reader instanceof LlapRecordReader && first) {
-        Schema schema = ((LlapRecordReader)reader).getSchema();
+      int numColumns = 2;
+      RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
+      if (reader instanceof LlapRowRecordReader && first) {
+        Schema schema = ((LlapRowRecordReader) reader).getSchema();
         System.out.println(""+schema);
+        assertEquals(numColumns, schema.getColumns().size());
       }
 
       if (first) {
@@ -228,9 +232,15 @@ public class TestJdbcWithMiniLlap {
         first = false;
       }
 
-      Text value = reader.createValue();
-      while (reader.next(NullWritable.get(), value)) {
-        System.out.println(value);
+      Row row = reader.createValue();
+      while (reader.next(NullWritable.get(), row)) {
+        for (int idx = 0; idx < numColumns; idx++) {
+          if (idx > 0) {
+            System.out.print(", ");
+          }
+          System.out.print(row.getValue(idx));
+        }
+        System.out.println("");
         ++rowCount;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 2be8c30..c99a351 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -70,6 +70,11 @@
       <artifactId>hive-service-rpc</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <!-- inter-project -->
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
new file mode 100644
index 0000000..a0ddeaa
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapBaseInputFormat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import java.sql.SQLException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.DriverManager;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.base.Preconditions;
+
+public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+  private static String driverName = "org.apache.hive.jdbc.HiveDriver";
+  private String url;  // "jdbc:hive2://localhost:10000/default"
+  private String user; // "hive",
+  private String pwd;  // ""
+  private String query;
+
+  public static final String URL_KEY = "llap.if.hs2.connection";
+  public static final String QUERY_KEY = "llap.if.query";
+  public static final String USER_KEY = "llap.if.user";
+  public static final String PWD_KEY = "llap.if.pwd";
+
+  public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+
+  private Connection con;
+  private Statement stmt;
+
+  public LlapBaseInputFormat(String url, String user, String pwd, String query) {
+    this.url = url;
+    this.user = user;
+    this.pwd = pwd;
+    this.query = query;
+  }
+
+  public LlapBaseInputFormat() {}
+
+
+  @Override
+  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    LlapInputSplit llapSplit = (LlapInputSplit) split;
+    return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    List<InputSplit> ins = new ArrayList<InputSplit>();
+
+    if (url == null) url = job.get(URL_KEY);
+    if (query == null) query = job.get(QUERY_KEY);
+    if (user == null) user = job.get(USER_KEY);
+    if (pwd == null) pwd = job.get(PWD_KEY);
+
+    if (url == null || query == null) {
+      throw new IllegalStateException();
+    }
+
+    try {
+      Class.forName(driverName);
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+
+    try {
+      con = DriverManager.getConnection(url,user,pwd);
+      stmt = con.createStatement();
+      String sql = String.format(SPLIT_QUERY, query, numSplits);
+      ResultSet res = stmt.executeQuery(sql);
+      while (res.next()) {
+        // deserialize split
+        DataInput in = new DataInputStream(res.getBinaryStream(3));
+        InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
+        is.readFields(in);
+        ins.add(new LlapInputSplit(is, res.getString(1)));
+      }
+
+      res.close();
+      stmt.close();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    return ins.toArray(new InputSplit[ins.size()]);
+  }
+
+  public void close() {
+    try {
+      con.close();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
index 7ed0a0e..4c3c3ab 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
@@ -49,10 +49,9 @@ import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.llap.LlapRecordReader;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.Schema;
 
 public class LlapDump {
 
@@ -98,7 +97,7 @@ public class LlapDump {
     System.out.println("user: "+user);
     System.out.println("query: "+query);
 
-    LlapInputFormat format = new LlapInputFormat(url, user, pwd, query);
+    LlapBaseInputFormat format = new LlapBaseInputFormat(url, user, pwd, query);
     JobConf job = new JobConf();
 
     InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
@@ -113,8 +112,8 @@ public class LlapDump {
         LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
         RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
 
-        if (reader instanceof LlapRecordReader && first) {
-          Schema schema = ((LlapRecordReader)reader).getSchema();
+        if (reader instanceof LlapBaseRecordReader && first) {
+          Schema schema = ((LlapBaseRecordReader)reader).getSchema();
           System.out.println(""+schema);
         }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
deleted file mode 100644
index 9a7c16d..0000000
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.jdbc;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import java.sql.SQLException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.sql.DriverManager;
-
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
-
-import com.google.common.base.Preconditions;
-
-public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
-  private static String driverName = "org.apache.hive.jdbc.HiveDriver";
-  private String url;  // "jdbc:hive2://localhost:10000/default"
-  private String user; // "hive",
-  private String pwd;  // ""
-  private String query;
-
-  public final String URL_KEY = "llap.if.hs2.connection";
-  public final String QUERY_KEY = "llap.if.query";
-  public final String USER_KEY = "llap.if.user";
-  public final String PWD_KEY = "llap.if.pwd";
-
-  public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
-
-  private Connection con;
-  private Statement stmt;
-
-  public LlapInputFormat(String url, String user, String pwd, String query) {
-    this.url = url;
-    this.user = user;
-    this.pwd = pwd;
-    this.query = query;
-  }
-
-  public LlapInputFormat() {}
-
-
-  @Override
-  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    LlapInputSplit llapSplit = (LlapInputSplit) split;
-    return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
-  }
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    List<InputSplit> ins = new ArrayList<InputSplit>();
-
-    if (url == null) url = job.get(URL_KEY);
-    if (query == null) query = job.get(QUERY_KEY);
-    if (user == null) user = job.get(USER_KEY);
-    if (pwd == null) pwd = job.get(PWD_KEY);
-
-    if (url == null || query == null) {
-      throw new IllegalStateException();
-    }
-
-    try {
-      Class.forName(driverName);
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
-
-    try {
-      con = DriverManager.getConnection(url,user,pwd);
-      stmt = con.createStatement();
-      String sql = String.format(SPLIT_QUERY, query, numSplits);
-      ResultSet res = stmt.executeQuery(sql);
-      while (res.next()) {
-        // deserialize split
-        DataInput in = new DataInputStream(res.getBinaryStream(3));
-        InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
-        is.readFields(in);
-        ins.add(new LlapInputSplit(is, res.getString(1)));
-      }
-
-      res.close();
-      stmt.close();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    return ins.toArray(new InputSplit[ins.size()]);
-  }
-
-  public void close() {
-    try {
-      con.close();
-    } catch (Exception e) {
-      // ignore
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
new file mode 100644
index 0000000..1cca66a
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapRowInputFormat.java
@@ -0,0 +1,34 @@
+package org.apache.hive.jdbc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
+  LlapBaseInputFormat<Text> baseInputFormat = new LlapBaseInputFormat<Text>();
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return baseInputFormat.getSplits(job, numSplits);
+  }
+
+  @Override
+  public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+      throws IOException {
+    LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split;
+    LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter);
+    return new LlapRowRecordReader(job, reader.getSchema(), reader);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/pom.xml
----------------------------------------------------------------------
diff --git a/llap-common/pom.xml b/llap-common/pom.xml
index 5343479..ceac83b 100644
--- a/llap-common/pom.xml
+++ b/llap-common/pom.xml
@@ -39,6 +39,11 @@
       <artifactId>hive-common</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <!-- inter-project -->
     <dependency>
@@ -58,6 +63,22 @@
         </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+            <exclusions>
+            <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
       <version>${tez.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java
new file mode 100644
index 0000000..9621978
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/FieldDesc.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+public class FieldDesc implements Writable {
+  private String name;
+  private TypeDesc typeDesc;
+
+  public FieldDesc() {
+    typeDesc = new TypeDesc();
+  }
+
+  public FieldDesc(String name, TypeDesc typeDesc) {
+    this.name = name;
+    this.typeDesc = typeDesc;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public TypeDesc getTypeDesc() {
+    return typeDesc;
+  }
+
+  @Override
+  public String toString() {
+    return getName() + ":" + getTypeDesc().toString();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(name);
+    typeDesc.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    name = in.readUTF();
+    typeDesc.readFields(in);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
new file mode 100644
index 0000000..4e000ff
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -0,0 +1,155 @@
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.TypeDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
+
+  Configuration conf;
+  RecordReader<NullWritable, Text> reader;
+  Schema schema;
+  SerDe serde;
+  final Text textData = new Text();
+
+  public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader<NullWritable, Text> reader) {
+    this.conf = conf;
+    this.schema = schema;
+    this.reader = reader;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public Row createValue() {
+    return new Row(schema);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public boolean next(NullWritable key, Row value) throws IOException {
+    Preconditions.checkArgument(value != null);
+
+    if (serde == null) {
+      try {
+        serde = initSerDe(conf);
+      } catch (SerDeException err) {
+        throw new IOException(err);
+      }
+    }
+
+    boolean hasNext = reader.next(key,  textData);
+    if (hasNext) {
+      // Deserialize Text to column values, and populate the row record
+      Object rowObj;
+      try {
+        StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
+        rowObj = serde.deserialize(textData);
+        List<? extends StructField> colFields = rowOI.getAllStructFieldRefs();
+        for (int idx = 0; idx < colFields.size(); ++idx) {
+          StructField field = colFields.get(idx);
+          Object colValue = rowOI.getStructFieldData(rowObj, field);
+          Preconditions.checkState(field.getFieldObjectInspector().getCategory() == Category.PRIMITIVE,
+              "Cannot handle non-primitive column type " + field.getFieldObjectInspector().getTypeName());
+
+          PrimitiveObjectInspector poi = (PrimitiveObjectInspector) field.getFieldObjectInspector();
+          // char/varchar special cased here since the row record handles them using Text
+          switch (poi.getPrimitiveCategory()) {
+            case CHAR:
+              value.setValue(idx, ((HiveCharWritable) poi.getPrimitiveWritableObject(colValue)).getPaddedValue());
+              break;
+            case VARCHAR:
+              value.setValue(idx, ((HiveVarcharWritable) poi.getPrimitiveWritableObject(colValue)).getTextValue());
+              break;
+            default:
+              value.setValue(idx, (Writable) poi.getPrimitiveWritableObject(colValue));
+              break;
+          }
+        }
+      } catch (SerDeException err) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Error deserializing row from text: " + textData);
+        }
+        throw new IOException("Error deserializing row data", err);
+      }
+    }
+
+    return hasNext;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  protected SerDe initSerDe(Configuration conf) throws SerDeException {
+    Properties props = new Properties();
+    StringBuffer columnsBuffer = new StringBuffer();
+    StringBuffer typesBuffer = new StringBuffer();
+    boolean isFirst = true;
+    for (FieldDesc colDesc : schema.getColumns()) {
+      if (!isFirst) {
+        columnsBuffer.append(',');
+        typesBuffer.append(',');
+      }
+      columnsBuffer.append(colDesc.getName());
+      typesBuffer.append(colDesc.getTypeDesc().toString());
+      isFirst = false;
+    }
+    String columns = columnsBuffer.toString();
+    String types = typesBuffer.toString();
+    props.put(serdeConstants.LIST_COLUMNS, columns);
+    props.put(serdeConstants.LIST_COLUMN_TYPES, types);
+    SerDe serde = new LazySimpleSerDe();
+    serde.initialize(conf, props);
+
+    return serde;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java
new file mode 100644
index 0000000..a84fadc
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Row.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+
+public class Row {
+  private final Schema schema;
+  private final Writable[] colValues;
+  private final boolean[] nullIndicators;
+  private Map<String, Integer> nameToIndexMapping;
+
+  public Row(Schema schema) {
+    this.schema = schema;
+    this.colValues = new Writable[schema.getColumns().size()];
+    this.nullIndicators = new boolean[schema.getColumns().size()];
+    this.nameToIndexMapping = new HashMap<String, Integer>(schema.getColumns().size());
+
+    List<FieldDesc> colDescs = schema.getColumns();
+    for (int idx = 0; idx < colDescs.size(); ++idx) {
+      FieldDesc colDesc = colDescs.get(idx);
+      nameToIndexMapping.put(colDesc.getName(), idx);
+      colValues[idx] = createWritableForType(colDesc.getTypeDesc());
+    }
+  }
+
+  public Writable getValue(int colIndex) {
+    if (nullIndicators[colIndex]) {
+      return null;
+    }
+    return colValues[colIndex];
+  }
+
+  public Writable getValue(String colName) {
+    Integer idx = nameToIndexMapping.get(colName);
+    Preconditions.checkArgument(idx != null);
+    return getValue(idx);
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  void setValue(int colIdx, Writable value) {
+    Preconditions.checkArgument(colIdx <= schema.getColumns().size());
+
+    if (value == null) {
+      nullIndicators[colIdx] = true;
+    } else {
+      nullIndicators[colIdx] = false;
+      FieldDesc colDesc = schema.getColumns().get(colIdx);
+      switch (colDesc.getTypeDesc().getType()) {
+        case BOOLEAN:
+          ((BooleanWritable) colValues[colIdx]).set(((BooleanWritable) value).get());
+          break;
+        case TINYINT:
+          ((ByteWritable) colValues[colIdx]).set(((ByteWritable) value).get());
+          break;
+        case SMALLINT:
+          ((ShortWritable) colValues[colIdx]).set(((ShortWritable) value).get());
+          break;
+        case INT:
+          ((IntWritable) colValues[colIdx]).set(((IntWritable) value).get());
+          break;
+        case BIGINT:
+          ((LongWritable) colValues[colIdx]).set(((LongWritable) value).get());
+          break;
+        case FLOAT:
+          ((FloatWritable) colValues[colIdx]).set(((FloatWritable) value).get());
+          break;
+        case DOUBLE:
+          ((DoubleWritable) colValues[colIdx]).set(((DoubleWritable) value).get());
+          break;
+        case STRING:
+        // Just handle char/varchar as Text
+        case CHAR:
+        case VARCHAR:
+          ((Text) colValues[colIdx]).set((Text) value);
+          break;
+        case DATE:
+          ((DateWritable) colValues[colIdx]).set((DateWritable) value);
+          break;
+        case TIMESTAMP:
+          ((TimestampWritable) colValues[colIdx]).set((TimestampWritable) value);
+          break;
+        case BINARY:
+          ((BytesWritable) colValues[colIdx]).set(((BytesWritable) value));
+          break;
+        case DECIMAL:
+          ((HiveDecimalWritable) colValues[colIdx]).set((HiveDecimalWritable) value);
+          break;
+      }
+    }
+  }
+
+  private Writable createWritableForType(TypeDesc typeDesc) {
+    switch (typeDesc.getType()) {
+      case BOOLEAN:
+        return new BooleanWritable();
+      case TINYINT:
+        return new ByteWritable();
+      case SMALLINT:
+        return new ShortWritable();
+      case INT:
+        return new IntWritable();
+      case BIGINT:
+        return new LongWritable();
+      case FLOAT:
+        return new FloatWritable();
+      case DOUBLE:
+        return new DoubleWritable();
+      case STRING:
+      // Just handle char/varchar as Text
+      case CHAR:
+      case VARCHAR:
+        return new Text();
+      case DATE:
+        return new DateWritable();
+      case TIMESTAMP:
+        return new TimestampWritable();
+      case BINARY:
+        return new BytesWritable();
+      case DECIMAL:
+        return new HiveDecimalWritable();
+      default:
+        throw new RuntimeException("Cannot create writable for " + typeDesc.getType());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java b/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java
new file mode 100644
index 0000000..c1bf4ea
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/Schema.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Writable;
+
+public class Schema implements Writable {
+
+  private final List<FieldDesc> columns;
+
+  public Schema(List<FieldDesc> columns) {
+    this.columns = columns;
+  }
+
+  public Schema() {
+    columns = new ArrayList<FieldDesc>();
+  }
+
+  public List<FieldDesc> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    boolean first = true;
+    for (FieldDesc colDesc : getColumns()) {
+      if (!first) {
+        sb.append(",");
+      }
+      sb.append(colDesc.toString());
+      first = false;
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(columns.size());
+    for (FieldDesc column : columns) {
+      column.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numColumns = in.readInt();
+    columns.clear();
+    for (int idx = 0; idx < numColumns; ++idx) {
+      FieldDesc colDesc = new FieldDesc();
+      colDesc.readFields(in);
+      columns.add(colDesc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java
new file mode 100644
index 0000000..dda5928
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/TypeDesc.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+public class TypeDesc implements Writable {
+  public static enum Type {
+    BOOLEAN,
+    TINYINT,
+    SMALLINT,
+    INT,
+    BIGINT,
+    FLOAT,
+    DOUBLE,
+    STRING,
+    CHAR,
+    VARCHAR,
+    DATE,
+    TIMESTAMP,
+    BINARY,
+    DECIMAL,
+  }
+
+  private TypeDesc.Type type;
+  private int precision;
+  private int scale;
+
+  // For types with no type qualifiers
+  public TypeDesc(TypeDesc.Type type) {
+    this(type, 0, 0);
+  }
+
+  // For decimal types
+  public TypeDesc(TypeDesc.Type type, int precision, int scale) {
+    this.type = type;
+    this.precision = precision;
+    this.scale = scale;
+  }
+
+  // For char/varchar types
+  public TypeDesc(TypeDesc.Type type, int precision) {
+    this(type, precision, 0);
+  }
+
+  // Should be used for serialization only
+  public TypeDesc() {
+    this(TypeDesc.Type.INT, 0, 0);
+  }
+
+  public TypeDesc.Type getType() {
+    return type;
+  }
+
+  public int getPrecision() {
+    return precision;
+  }
+
+  public int getScale() {
+    return scale;
+  }
+
+  @Override
+  public String toString() {
+    switch (type) {
+      case DECIMAL:
+        return type.name().toLowerCase() + "(" + precision + "," + scale + ")";
+      case CHAR:
+      case VARCHAR:
+        return type.name().toLowerCase() + "(" + precision + ")";
+      default:
+        return type.name().toLowerCase();
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(type.name());
+    out.writeInt(precision);
+    out.writeInt(scale);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    type = TypeDesc.Type.valueOf(in.readUTF());
+    precision = in.readInt();
+    scale = in.readInt();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java b/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java
new file mode 100644
index 0000000..d4e68f4
--- /dev/null
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/TestRow.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestRow {
+
+  @Test
+  public void testUsage() {
+    Schema schema = createTestSchema();
+    Row row = new Row(schema);
+
+    Random rand = new Random();
+    int iterations = 100;
+    Text col0 = new Text();
+    IntWritable col1 = new IntWritable();
+    for (int idx = 0; idx < iterations; ++idx) {
+      // Set the row values
+      boolean isNullCol0 = (rand.nextDouble() <= 0.25);
+      col0.set(RandomStringUtils.random(10));
+      row.setValue(0, isNullCol0 ? null : col0);
+
+      boolean isNullCol1 = (rand.nextDouble() <= 0.25);
+      col1.set(rand.nextInt());
+      row.setValue(1, isNullCol1 ? null : col1);
+
+      // Validate the row values
+      if (isNullCol0) {
+        assertTrue(row.getValue(0) == null);
+        assertTrue(row.getValue("col0") == null);
+      } else {
+        assertTrue(row.getValue(0) != null);
+        assertTrue(col0 != row.getValue(0));
+        assertEquals(col0, row.getValue(0));
+        assertEquals(col0, row.getValue("col0"));
+      }
+
+      if (isNullCol1) {
+        assertTrue(row.getValue(1) == null);
+        assertTrue(row.getValue("col1") == null);
+      } else {
+        assertTrue(row.getValue(1) != null);
+        assertTrue(col1 != row.getValue(1));
+        assertEquals(col1, row.getValue(1));
+        assertEquals(col1, row.getValue("col1"));
+      }
+    }
+  }
+
+  private Schema createTestSchema() {
+    List<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+
+    colDescs.add(new FieldDesc("col0",
+        new TypeDesc(TypeDesc.Type.STRING)));
+
+    colDescs.add(new FieldDesc("col1",
+        new TypeDesc(TypeDesc.Type.INT)));
+
+    Schema schema = new Schema(colDescs);
+    return schema;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index aaca7d6..0930d60 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -30,7 +30,7 @@ import com.google.protobuf.ByteString;
 
 import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapRecordReader.ReaderEvent;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
@@ -74,7 +74,6 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
 
-
   public LlapInputFormat() {
   }
 
@@ -135,7 +134,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
 
     LOG.info("Registered id: " + id);
 
-    LlapRecordReader recordReader = new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
     umbilicalResponder.setRecordReader(recordReader);
     return recordReader;
   }
@@ -276,7 +275,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
   }
 
   private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
-    protected LlapRecordReader recordReader = null;
+    protected LlapBaseRecordReader recordReader = null;
     protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
 
     public LlapRecordReaderTaskUmbilicalExternalResponder() {
@@ -285,7 +284,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     @Override
     public void submissionFailed(String fragmentId, Throwable throwable) {
       try {
-        sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+        sendOrQueueEvent(ReaderEvent.errorEvent(
             "Received submission failed event for fragment ID " + fragmentId));
       } catch (Exception err) {
         LOG.error("Error during heartbeat responder:", err);
@@ -301,11 +300,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
         try {
           switch (eventType) {
             case TASK_ATTEMPT_COMPLETED_EVENT:
-              sendOrQueueEvent(LlapRecordReader.ReaderEvent.doneEvent());
+              sendOrQueueEvent(ReaderEvent.doneEvent());
               break;
             case TASK_ATTEMPT_FAILED_EVENT:
               TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
-              sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+              sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
               break;
             case TASK_STATUS_UPDATE_EVENT:
               // If we want to handle counters
@@ -323,7 +322,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     @Override
     public void taskKilled(TezTaskAttemptID taskAttemptId) {
       try {
-        sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+        sendOrQueueEvent(ReaderEvent.errorEvent(
             "Received task killed event for task ID " + taskAttemptId));
       } catch (Exception err) {
         LOG.error("Error during heartbeat responder:", err);
@@ -333,18 +332,18 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     @Override
     public void heartbeatTimeout(String taskAttemptId) {
       try {
-        sendOrQueueEvent(LlapRecordReader.ReaderEvent.errorEvent(
+        sendOrQueueEvent(ReaderEvent.errorEvent(
             "Timed out waiting for heartbeat for task ID " + taskAttemptId));
       } catch (Exception err) {
         LOG.error("Error during heartbeat responder:", err);
       }
     }
 
-    public synchronized LlapRecordReader getRecordReader() {
+    public synchronized LlapBaseRecordReader getRecordReader() {
       return recordReader;
     }
 
-    public synchronized void setRecordReader(LlapRecordReader recordReader) {
+    public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
       this.recordReader = recordReader;
 
       if (recordReader == null) {
@@ -353,7 +352,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
 
       // If any events were queued by the responder, give them to the record reader now.
       while (!queuedEvents.isEmpty()) {
-        LlapRecordReader.ReaderEvent readerEvent = queuedEvents.poll();
+        ReaderEvent readerEvent = queuedEvents.poll();
         LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
         recordReader.handleEvent(readerEvent);
       }
@@ -365,8 +364,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
      * since we don't want to drop these events.
      * @param readerEvent
      */
-    protected synchronized void sendOrQueueEvent(LlapRecordReader.ReaderEvent readerEvent) {
-      LlapRecordReader recordReader = getRecordReader();
+    protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
+      LlapBaseRecordReader recordReader = getRecordReader();
       if (recordReader != null) {
         recordReader.handleEvent(readerEvent);
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
new file mode 100644
index 0000000..7073280
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInputStream;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
+
+  DataInputStream din;
+  Schema schema;
+  Class<V> clazz;
+
+
+  protected Thread readerThread = null;
+  protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+  public LlapBaseRecordReader(InputStream in, Schema schema, Class<V> clazz) {
+    din = new DataInputStream(in);
+    this.schema = schema;
+    this.clazz = clazz;
+    this.readerThread = Thread.currentThread();
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public void close() throws IOException {
+    din.close();
+  }
+
+  @Override
+  public long getPos() { return 0; }
+
+  @Override
+  public float getProgress() { return 0f; }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V createValue() {
+    try {
+      return clazz.newInstance();
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean next(NullWritable key, V value) throws IOException {
+    try {
+      // Need a way to know what thread to interrupt, since this is a blocking thread.
+      setReaderThread(Thread.currentThread());
+
+      value.readFields(din);
+      return true;
+    } catch (EOFException eof) {
+      // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
+      ReaderEvent event = getReaderEvent();
+      switch (event.getEventType()) {
+        case DONE:
+          break;
+        default:
+          throw new IOException("Expected reader event with done status, but got "
+              + event.getEventType() + " with message " + event.getMessage());
+      }
+      return false;
+    } catch (IOException io) {
+      if (Thread.interrupted()) {
+        // Either we were interrupted by one of:
+        // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
+        // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+        // Either way we should not try to block trying to read the reader events queue.
+        if (readerEvents.isEmpty()) {
+          // Case 2.
+          throw io;
+        } else {
+          // Case 1. Fail the reader, sending back the error we received from the reader event.
+          ReaderEvent event = getReaderEvent();
+          switch (event.getEventType()) {
+            case ERROR:
+              throw new IOException("Received reader event error: " + event.getMessage());
+            default:
+              throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
+          }
+        }
+      } else {
+        // If we weren't interrupted, just propagate the error
+        throw io;
+      }
+    }
+  }
+
+  /**
+   * Define success/error events which are passed to the reader from a different thread.
+   * The reader will check for these events on end of input and interruption of the reader thread.
+   */
+  public static class ReaderEvent {
+    public enum EventType {
+      DONE,
+      ERROR
+    }
+
+    protected final EventType eventType;
+    protected final String message;
+
+    protected ReaderEvent(EventType type, String message) {
+      this.eventType = type;
+      this.message = message;
+    }
+
+    public static ReaderEvent doneEvent() {
+      return new ReaderEvent(EventType.DONE, "");
+    }
+
+    public static ReaderEvent errorEvent(String message) {
+      return new ReaderEvent(EventType.ERROR, message);
+    }
+
+    public EventType getEventType() {
+      return eventType;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+  }
+
+  public void handleEvent(ReaderEvent event) {
+    switch (event.getEventType()) {
+      case DONE:
+        // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
+        readerEvents.add(event);
+        break;
+      case ERROR:
+        readerEvents.add(event);
+        if (readerThread == null) {
+          throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
+        }
+        // Reader is using a blocking socket .. interrupt it.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
+        }
+        getReaderThread().interrupt();
+        break;
+      default:
+        throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
+    }
+  }
+
+  protected ReaderEvent getReaderEvent() {
+    try {
+      ReaderEvent event = readerEvents.take();
+      return event;
+    } catch (InterruptedException ie) {
+      throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
+    }
+  }
+
+  protected synchronized void setReaderThread(Thread readerThread) {
+    this.readerThread = readerThread;
+  }
+
+  protected synchronized Thread getReaderThread() {
+    return readerThread;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 17a0d2d..02aedfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -20,7 +20,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.Schema;
 import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.thrift.TDeserializer;
@@ -93,17 +93,7 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
       out.writeUTF(locations[i].getLocation());
     }
 
-    byte[] binarySchema;
-
-    try {
-      TSerializer serializer = new TSerializer();
-      byte[] serialzied = serializer.serialize(schema);
-      out.writeInt(serialzied.length);
-      out.write(serialzied);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
+    schema.write(out);
     out.writeUTF(llapUser);
   }
 
@@ -125,17 +115,8 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
       locations[i] = new SplitLocationInfo(in.readUTF(), false);
     }
 
-    length = in.readInt();
-
-    try {
-      byte[] schemaBytes = new byte[length];
-      in.readFully(schemaBytes);
-      TDeserializer tDeserializer = new TDeserializer();
-      schema = new Schema();
-      tDeserializer.deserialize(schema, schemaBytes);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    schema = new Schema();
+    schema.readFields(in);
     llapUser = in.readUTF();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
deleted file mode 100644
index 64e5e69..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordReader.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.DataInputStream;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.hive.metastore.api.Schema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LlapRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
-  private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
-
-  DataInputStream din;
-  Schema schema;
-  Class<V> clazz;
-
-
-  protected Thread readerThread = null;
-  protected LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
-
-  public LlapRecordReader(InputStream in, Schema schema, Class<V> clazz) {
-    din = new DataInputStream(in);
-    this.schema = schema;
-    this.clazz = clazz;
-    this.readerThread = Thread.currentThread();
-  }
-
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public void close() throws IOException {
-    din.close();
-  }
-
-  @Override
-  public long getPos() { return 0; }
-
-  @Override
-  public float getProgress() { return 0f; }
-
-  @Override
-  public NullWritable createKey() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public V createValue() {
-    try {
-      return clazz.newInstance();
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  @Override
-  public boolean next(NullWritable key, V value) throws IOException {
-    try {
-      // Need a way to know what thread to interrupt, since this is a blocking thread.
-      setReaderThread(Thread.currentThread());
-
-      value.readFields(din);
-      return true;
-    } catch (EOFException eof) {
-      // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
-      ReaderEvent event = getReaderEvent();
-      switch (event.getEventType()) {
-        case DONE:
-          break;
-        default:
-          throw new IOException("Expected reader event with done status, but got "
-              + event.getEventType() + " with message " + event.getMessage());
-      }
-      return false;
-    } catch (IOException io) {
-      if (Thread.interrupted()) {
-        // Either we were interrupted by one of:
-        // 1. handleEvent(), in which case there is a reader event waiting for us in the queue
-        // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
-        // Either way we should not try to block trying to read the reader events queue.
-        if (readerEvents.isEmpty()) {
-          // Case 2.
-          throw io;
-        } else {
-          // Case 1. Fail the reader, sending back the error we received from the reader event.
-          ReaderEvent event = getReaderEvent();
-          switch (event.getEventType()) {
-            case ERROR:
-              throw new IOException("Received reader event error: " + event.getMessage());
-            default:
-              throw new IOException("Got reader event type " + event.getEventType() + ", expected error event");
-          }
-        }
-      } else {
-        // If we weren't interrupted, just propagate the error
-        throw io;
-      }
-    }
-  }
-
-  /**
-   * Define success/error events which are passed to the reader from a different thread.
-   * The reader will check for these events on end of input and interruption of the reader thread.
-   */
-  public static class ReaderEvent {
-    public enum EventType {
-      DONE,
-      ERROR
-    }
-
-    protected final EventType eventType;
-    protected final String message;
-
-    protected ReaderEvent(EventType type, String message) {
-      this.eventType = type;
-      this.message = message;
-    }
-
-    public static ReaderEvent doneEvent() {
-      return new ReaderEvent(EventType.DONE, "");
-    }
-
-    public static ReaderEvent errorEvent(String message) {
-      return new ReaderEvent(EventType.ERROR, message);
-    }
-
-    public EventType getEventType() {
-      return eventType;
-    }
-
-    public String getMessage() {
-      return message;
-    }
-  }
-
-  public void handleEvent(ReaderEvent event) {
-    switch (event.getEventType()) {
-      case DONE:
-        // Reader will check for the event queue upon the end of the input stream - no need to interrupt.
-        readerEvents.add(event);
-        break;
-      case ERROR:
-        readerEvents.add(event);
-        if (readerThread == null) {
-          throw new RuntimeException("Reader thread is unexpectedly null, during ReaderEvent error " + event.getMessage());
-        }
-        // Reader is using a blocking socket .. interrupt it.
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Interrupting reader thread due to reader event with error " + event.getMessage());
-        }
-        getReaderThread().interrupt();
-        break;
-      default:
-        throw new RuntimeException("Unhandled ReaderEvent type " + event.getEventType() + " with message " + event.getMessage());
-    }
-  }
-
-  protected ReaderEvent getReaderEvent() {
-    try {
-      ReaderEvent event = readerEvents.take();
-      return event;
-    } catch (InterruptedException ie) {
-      throw new RuntimeException("Interrupted while getting readerEvents, not expected", ie);
-    }
-  }
-
-  protected synchronized void setReaderThread(Thread readerThread) {
-    this.readerThread = readerThread;
-  }
-
-  protected synchronized Thread getReaderThread() {
-    return readerThread;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 6267324..51027a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -45,7 +45,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapInputSplit;
 import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.TypeDesc;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver;
@@ -71,6 +74,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -220,7 +229,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
     QueryPlan plan = driver.getPlan();
     List<Task<?>> roots = plan.getRootTasks();
-    Schema schema = plan.getResultSchema();
+    Schema schema = convertSchema(plan.getResultSchema());
 
     if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
       throw new HiveException("Was expecting a single TezTask.");
@@ -255,7 +264,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
       plan = driver.getPlan();
       roots = plan.getRootTasks();
-      schema = plan.getResultSchema();
+      schema = convertSchema(plan.getResultSchema());
 
       if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
         throw new HiveException("Was expecting a single TezTask.");
@@ -416,6 +425,78 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
   }
 
+  private TypeDesc convertTypeString(String typeString) throws HiveException {
+    TypeDesc typeDesc;
+    TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
+    Preconditions.checkState(typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE,
+        "Unsupported non-primitive type " + typeString);
+
+    switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+      case BOOLEAN:
+        typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN);
+        break;
+      case BYTE:
+        typeDesc = new TypeDesc(TypeDesc.Type.TINYINT);
+        break;
+      case SHORT:
+        typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT);
+        break;
+      case INT:
+        typeDesc = new TypeDesc(TypeDesc.Type.INT);
+        break;
+      case LONG:
+        typeDesc = new TypeDesc(TypeDesc.Type.BIGINT);
+        break;
+      case FLOAT:
+        typeDesc = new TypeDesc(TypeDesc.Type.FLOAT);
+        break;
+      case DOUBLE:
+        typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE);
+        break;
+      case STRING:
+        typeDesc = new TypeDesc(TypeDesc.Type.STRING);
+        break;
+      case CHAR:
+        CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo;
+        typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength());
+        break;
+      case VARCHAR:
+        VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo;
+        typeDesc = new TypeDesc(TypeDesc.Type.CHAR, varcharTypeInfo.getLength());
+        break;
+      case DATE:
+        typeDesc = new TypeDesc(TypeDesc.Type.DATE);
+        break;
+      case TIMESTAMP:
+        typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP);
+        break;
+      case BINARY:
+        typeDesc = new TypeDesc(TypeDesc.Type.BINARY);
+        break;
+      case DECIMAL:
+        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+        typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
+        break;
+      default:
+        throw new HiveException("Unsupported type " + typeString);
+    }
+
+    return typeDesc;
+  }
+
+  private Schema convertSchema(Object obj) throws HiveException {
+    org.apache.hadoop.hive.metastore.api.Schema schema = (org.apache.hadoop.hive.metastore.api.Schema) obj;
+    List<FieldDesc> colDescs = new ArrayList<FieldDesc>();
+    for (FieldSchema fs : schema.getFieldSchemas()) {
+      String colName = fs.getName();
+      String typeString = fs.getType();
+      TypeDesc typeDesc = convertTypeString(typeString);
+      colDescs.add(new FieldDesc(colName, typeDesc));
+    }
+    Schema Schema = new Schema(colDescs);
+    return Schema;
+  }
+
   @Override
   public void close() throws HiveException {
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/fc7343dd/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 7b516fe..37e21b8 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 
 
@@ -112,10 +113,13 @@ public class TestLlapOutputFormat {
       writer.close(null);
 
       InputStream in = socket.getInputStream();
-      RecordReader reader = new LlapRecordReader(in, null, Text.class);
+      LlapBaseRecordReader reader = new LlapBaseRecordReader(in, null, Text.class);
 
       LOG.debug("Have record reader");
 
+      // Send done event, which LlapRecordReader is expecting upon end of input
+      reader.handleEvent(ReaderEvent.doneEvent());
+
       int count = 0;
       while(reader.next(NullWritable.get(), text)) {
         LOG.debug(text.toString());