You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/05/21 20:48:13 UTC

[1/2] hive git commit: HIVE-19308: Provide an Arrow stream reader for external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master 610748287 -> 2c78ceb6a


http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
index df7b53f..dd490b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -15,26 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.ql.io.arrow;
 
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-public class ArrowWrapperWritable implements Writable {
+public class ArrowWrapperWritable implements WritableComparable {
   private VectorSchemaRoot vectorSchemaRoot;
 
   public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
     this.vectorSchemaRoot = vectorSchemaRoot;
   }
+  public ArrowWrapperWritable() {}
 
   public VectorSchemaRoot getVectorSchemaRoot() {
     return vectorSchemaRoot;
   }
 
+  public void setVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) {
+    this.vectorSchemaRoot = vectorSchemaRoot;
+  }
+
   @Override
   public void write(DataOutput dataOutput) throws IOException {
     throw new UnsupportedOperationException();
@@ -44,4 +50,12 @@ public class ArrowWrapperWritable implements Writable {
   public void readFields(DataInput dataInput) throws IOException {
     throw new UnsupportedOperationException();
   }
+
+  @Override public int compareTo(Object o) {
+    return 0;
+  }
+
+  @Override public boolean equals(Object o) {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
index 78cc188..7aa732b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.ql.io.arrow;
 
 import org.apache.arrow.memory.RootAllocator;
@@ -41,4 +42,12 @@ public enum RootAllocatorFactory {
     }
     return rootAllocator;
   }
+
+  //arrowAllocatorLimit is ignored if an allocator was previously created
+  public synchronized RootAllocator getOrCreateRootAllocator(long arrowAllocatorLimit) {
+    if (rootAllocator == null) {
+      rootAllocator = new RootAllocator(arrowAllocatorLimit);
+    }
+    return rootAllocator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 13a3070..f27cdf4 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -54,6 +54,7 @@ public class TestLlapOutputFormat {
     Configuration conf = new Configuration();
     // Pick random avail port
     HiveConf.setIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
     LlapOutputFormatService.initializeAndStart(conf, null);
     service = LlapOutputFormatService.get();
     LlapProxy.setDaemon(true);


[2/2] hive git commit: HIVE-19308: Provide an Arrow stream reader for external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)

Posted by jd...@apache.org.
HIVE-19308: Provide an Arrow stream reader for external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2c78ceb6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2c78ceb6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2c78ceb6

Branch: refs/heads/master
Commit: 2c78ceb6abdc15585bf911633c2c01271b06da9b
Parents: 6107482
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon May 21 13:47:43 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon May 21 13:47:43 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../hive/jdbc/AbstractJdbcTriggersTest.java     |   4 +-
 .../apache/hive/jdbc/BaseJdbcWithMiniLlap.java  | 628 +++++++++++++++++++
 .../apache/hive/jdbc/TestJdbcWithMiniLlap.java  | 616 ------------------
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java    | 230 +++++++
 .../hive/jdbc/TestJdbcWithMiniLlapRow.java      |  45 ++
 .../hadoop/hive/llap/LlapBaseRecordReader.java  | 101 ++-
 .../hadoop/hive/llap/LlapRowRecordReader.java   |  26 +-
 llap-ext-client/pom.xml                         |   5 +
 .../hive/llap/LlapArrowBatchRecordReader.java   |  82 +++
 .../hive/llap/LlapArrowRowInputFormat.java      |  53 ++
 .../hive/llap/LlapArrowRowRecordReader.java     | 107 ++++
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |  27 +-
 pom.xml                                         |   1 +
 .../hive/ql/io/arrow/ArrowWrapperWritable.java  |  18 +-
 .../hive/ql/io/arrow/RootAllocatorFactory.java  |   9 +
 .../hadoop/hive/llap/TestLlapOutputFormat.java  |   1 +
 17 files changed, 1267 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b12a7a4..c14caf6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4166,7 +4166,7 @@ public class HiveConf extends Configuration {
             Constants.LLAP_LOGGER_NAME_RFA,
             Constants.LLAP_LOGGER_NAME_CONSOLE),
         "logger used for llap-daemons."),
-    LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false,
+    LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true,
       "Whether LLapOutputFormatService should output arrow batches"),
 
     HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 17e44bb..7d5172b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -90,7 +90,7 @@ public abstract class AbstractJdbcTriggersTest {
 
   @Before
   public void setUp() throws Exception {
-    hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+    hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
   }
 
   @After
@@ -124,7 +124,7 @@ public abstract class AbstractJdbcTriggersTest {
     throws Exception {
 
     Connection con = hs2Conn;
-    TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
+    BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
     createSleepUDF();
 
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
new file mode 100644
index 0000000..7a891ef
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.NucleusContext;
+import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
+import org.datanucleus.AbstractNucleusContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * Specialize this base class for different serde's/formats
+ * {@link #beforeTest(boolean) beforeTest} should be called
+ * by sub-classes in a {@link org.junit.BeforeClass} initializer
+ */
+public abstract class BaseJdbcWithMiniLlap {
+  private static MiniHS2 miniHS2 = null;
+  private static String dataFileDir;
+  private static Path kvDataFilePath;
+  private static Path dataTypesFilePath;
+
+  private static HiveConf conf = null;
+  private static Connection hs2Conn = null;
+
+  // This method should be called by sub-classes in a @BeforeClass initializer
+  public static void beforeTest(boolean useArrow) throws Exception {
+    Class.forName(MiniHS2.getJdbcDriverName());
+
+    String confDir = "../../data/conf/llap/";
+    if (confDir != null && !confDir.isEmpty()) {
+      HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
+      System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
+    }
+
+    conf = new HiveConf();
+    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    if(useArrow) {
+      conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    } else {
+      conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+    }
+
+    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+        + "/tez-site.xml"));
+
+    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+
+    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+    kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+    dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    miniHS2.start(confOverlay);
+    miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+  }
+
+  public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException {
+    Connection conn = DriverManager.getConnection(jdbcURL, user, pwd);
+    conn.createStatement().execute("set hive.support.concurrency = false");
+    return conn;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LlapBaseInputFormat.closeAll();
+    hs2Conn.close();
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if (miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+  }
+
+  private void createTestTable(String tableName) throws Exception {
+    createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
+  }
+
+  public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws
+    Exception {
+    Statement stmt = connection.createStatement();
+
+    if (database != null) {
+      stmt.execute("CREATE DATABASE IF NOT EXISTS " + database);
+      stmt.execute("USE " + database);
+    }
+
+    // create table
+    stmt.execute("DROP TABLE IF EXISTS " + tableName);
+    stmt.execute("CREATE TABLE " + tableName
+        + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
+
+    // load data
+    stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName);
+
+    ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
+    assertTrue(res.next());
+    assertEquals("val_238", res.getString(2));
+    res.close();
+    stmt.close();
+  }
+
+  protected void createDataTypesTable(String tableName) throws Exception {
+    Statement stmt = hs2Conn.createStatement();
+
+    // create table
+    stmt.execute("DROP TABLE IF EXISTS " + tableName);
+    // tables with various types
+    stmt.execute("create table " + tableName
+        + " (c1 int, c2 boolean, c3 double, c4 string,"
+        + " c5 array<int>, c6 map<int,string>, c7 map<string,string>,"
+        + " c8 struct<r:string,s:int,t:double>,"
+        + " c9 tinyint, c10 smallint, c11 float, c12 bigint,"
+        + " c13 array<array<string>>,"
+        + " c14 map<int, map<int,int>>,"
+        + " c15 struct<r:int,s:struct<a:int,b:string>>,"
+        + " c16 array<struct<m:map<string,string>,n:int>>,"
+        + " c17 timestamp, "
+        + " c18 decimal(16,7), "
+        + " c19 binary, "
+        + " c20 date,"
+        + " c21 varchar(20),"
+        + " c22 char(15),"
+        + " c23 binary"
+        + ")");
+    stmt.execute("load data local inpath '"
+        + dataTypesFilePath.toString() + "' into table " + tableName);
+    stmt.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testLlapInputFormatEndToEnd() throws Exception {
+    createTestTable("testtab1");
+
+    int rowCount;
+
+    RowCollector rowCollector = new RowCollector();
+    String query = "select * from testtab1 where under_col = 0";
+    rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(3, rowCount);
+    assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0));
+    assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1));
+    assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2));
+
+    // Try empty rows query
+    rowCollector.rows.clear();
+    query = "select * from testtab1 where true = false";
+    rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(0, rowCount);
+  }
+
+  @Test(timeout = 60000)
+  public void testNonAsciiStrings() throws Exception {
+    createTestTable("testtab_nonascii");
+
+    RowCollector rowCollector = new RowCollector();
+    String nonAscii = "À côté du garçon";
+    String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0";
+    int rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(3, rowCount);
+
+    assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0));
+    assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1));
+    assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2));
+  }
+
+  @Test(timeout = 60000)
+  public void testEscapedStrings() throws Exception {
+    createTestTable("testtab1");
+
+    RowCollector rowCollector = new RowCollector();
+    String expectedVal1 = "'a',\"b\",\\c\\";
+    String expectedVal2 = "multi\nline";
+    String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0";
+    int rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(3, rowCount);
+
+    assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0));
+    assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1));
+    assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2));
+  }
+
+  @Test(timeout = 60000)
+  public void testDataTypes() throws Exception {
+    createDataTypesTable("datatypes");
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from datatypes";
+    int rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(3, rowCount);
+
+    // Verify schema
+    String[][] colNameTypes = new String[][] {
+        {"datatypes.c1", "int"},
+        {"datatypes.c2", "boolean"},
+        {"datatypes.c3", "double"},
+        {"datatypes.c4", "string"},
+        {"datatypes.c5", "array<int>"},
+        {"datatypes.c6", "map<int,string>"},
+        {"datatypes.c7", "map<string,string>"},
+        {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+        {"datatypes.c9", "tinyint"},
+        {"datatypes.c10", "smallint"},
+        {"datatypes.c11", "float"},
+        {"datatypes.c12", "bigint"},
+        {"datatypes.c13", "array<array<string>>"},
+        {"datatypes.c14", "map<int,map<int,int>>"},
+        {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+        {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+        {"datatypes.c17", "timestamp"},
+        {"datatypes.c18", "decimal(16,7)"},
+        {"datatypes.c19", "binary"},
+        {"datatypes.c20", "date"},
+        {"datatypes.c21", "varchar(20)"},
+        {"datatypes.c22", "char(15)"},
+        {"datatypes.c23", "binary"},
+    };
+    FieldDesc fieldDesc;
+    assertEquals(23, rowCollector.numColumns);
+    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+      fieldDesc = rowCollector.schema.getColumns().get(idx);
+      assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+      assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+    }
+
+    // First row is all nulls
+    Object[] rowValues = rowCollector.rows.get(0);
+    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+      assertEquals("idx=" + idx, null, rowValues[idx]);
+    }
+
+    // Second Row
+    rowValues = rowCollector.rows.get(1);
+    assertEquals(Integer.valueOf(-1), rowValues[0]);
+    assertEquals(Boolean.FALSE, rowValues[1]);
+    assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+    assertEquals("", rowValues[3]);
+
+    List<?> c5Value = (List<?>) rowValues[4];
+    assertEquals(0, c5Value.size());
+
+    Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+    assertEquals(0, c6Value.size());
+
+    Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+    assertEquals(0, c7Value.size());
+
+    List<?> c8Value = (List<?>) rowValues[7];
+    assertEquals(null, c8Value.get(0));
+    assertEquals(null, c8Value.get(1));
+    assertEquals(null, c8Value.get(2));
+
+    assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+    assertEquals(Short.valueOf((short) -1), rowValues[9]);
+    assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+    assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+    List<?> c13Value = (List<?>) rowValues[12];
+    assertEquals(0, c13Value.size());
+
+    Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+    assertEquals(0, c14Value.size());
+
+    List<?> c15Value = (List<?>) rowValues[14];
+    assertEquals(null, c15Value.get(0));
+    assertEquals(null, c15Value.get(1));
+
+    List<?> c16Value = (List<?>) rowValues[15];
+    assertEquals(0, c16Value.size());
+
+    assertEquals(null, rowValues[16]);
+    assertEquals(null, rowValues[17]);
+    assertEquals(null, rowValues[18]);
+    assertEquals(null, rowValues[19]);
+    assertEquals(null, rowValues[20]);
+    assertEquals(null, rowValues[21]);
+    assertEquals(null, rowValues[22]);
+
+    // Third row
+    rowValues = rowCollector.rows.get(2);
+    assertEquals(Integer.valueOf(1), rowValues[0]);
+    assertEquals(Boolean.TRUE, rowValues[1]);
+    assertEquals(Double.valueOf(1.1d), rowValues[2]);
+    assertEquals("1", rowValues[3]);
+
+    c5Value = (List<?>) rowValues[4];
+    assertEquals(2, c5Value.size());
+    assertEquals(Integer.valueOf(1), c5Value.get(0));
+    assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+    c6Value = (Map<?,?>) rowValues[5];
+    assertEquals(2, c6Value.size());
+    assertEquals("x", c6Value.get(Integer.valueOf(1)));
+    assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+    c7Value = (Map<?,?>) rowValues[6];
+    assertEquals(1, c7Value.size());
+    assertEquals("v", c7Value.get("k"));
+
+    c8Value = (List<?>) rowValues[7];
+    assertEquals("a", c8Value.get(0));
+    assertEquals(Integer.valueOf(9), c8Value.get(1));
+    assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+    assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+    assertEquals(Short.valueOf((short) 1), rowValues[9]);
+    assertEquals(Float.valueOf(1.0f), rowValues[10]);
+    assertEquals(Long.valueOf(1l), rowValues[11]);
+
+    c13Value = (List<?>) rowValues[12];
+    assertEquals(2, c13Value.size());
+    List<?> listVal = (List<?>) c13Value.get(0);
+    assertEquals("a", listVal.get(0));
+    assertEquals("b", listVal.get(1));
+    listVal = (List<?>) c13Value.get(1);
+    assertEquals("c", listVal.get(0));
+    assertEquals("d", listVal.get(1));
+
+    c14Value = (Map<?,?>) rowValues[13];
+    assertEquals(2, c14Value.size());
+    Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+    assertEquals(2, mapVal.size());
+    assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+    assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+    mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+    assertEquals(1, mapVal.size());
+    assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+    c15Value = (List<?>) rowValues[14];
+    assertEquals(Integer.valueOf(1), c15Value.get(0));
+    listVal = (List<?>) c15Value.get(1);
+    assertEquals(2, listVal.size());
+    assertEquals(Integer.valueOf(2), listVal.get(0));
+    assertEquals("x", listVal.get(1));
+
+    c16Value = (List<?>) rowValues[15];
+    assertEquals(2, c16Value.size());
+    listVal = (List<?>) c16Value.get(0);
+    assertEquals(2, listVal.size());
+    mapVal = (Map<?,?>) listVal.get(0);
+    assertEquals(0, mapVal.size());
+    assertEquals(Integer.valueOf(1), listVal.get(1));
+    listVal = (List<?>) c16Value.get(1);
+    mapVal = (Map<?,?>) listVal.get(0);
+    assertEquals(2, mapVal.size());
+    assertEquals("b", mapVal.get("a"));
+    assertEquals("d", mapVal.get("c"));
+    assertEquals(Integer.valueOf(2), listVal.get(1));
+
+    assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
+    assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+    assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+    assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+    assertEquals("abc123", rowValues[20]);
+    assertEquals("abc123         ", rowValues[21]);
+    assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+  }
+
+
+  @Test(timeout = 60000)
+  public void testComplexQuery() throws Exception {
+    createTestTable("testtab1");
+
+    RowCollector rowCollector = new RowCollector();
+    String query = "select value, count(*) from testtab1 where under_col=0 group by value";
+    int rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(1, rowCount);
+
+    assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0));
+  }
+
+  private interface RowProcessor {
+    void process(Row row);
+  }
+
+  protected static class RowCollector implements RowProcessor {
+    ArrayList<String[]> rows = new ArrayList<String[]>();
+    Schema schema = null;
+    int numColumns = 0;
+
+    public void process(Row row) {
+      if (schema == null) {
+        schema = row.getSchema();
+        numColumns = schema.getColumns().size();
+      }
+
+      String[] arr = new String[numColumns];
+      for (int idx = 0; idx < numColumns; ++idx) {
+        Object val = row.getValue(idx);
+        arr[idx] = (val == null ? null : val.toString());
+      }
+      rows.add(arr);
+    }
+  }
+
+  // Save the actual values from each row as opposed to the String representation.
+  protected static class RowCollector2 implements RowProcessor {
+    ArrayList<Object[]> rows = new ArrayList<Object[]>();
+    Schema schema = null;
+    int numColumns = 0;
+
+    public void process(Row row) {
+      if (schema == null) {
+        schema = row.getSchema();
+        numColumns = schema.getColumns().size();
+      }
+
+      Object[] arr = new Object[numColumns];
+      for (int idx = 0; idx < numColumns; ++idx) {
+        arr[idx] = row.getValue(idx);
+      }
+      rows.add(arr);
+    }
+  }
+
+  protected int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception {
+    return processQuery(null, query, numSplits, rowProcessor);
+  }
+
+  protected abstract InputFormat<NullWritable, Row> getInputFormat();
+
+  private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception {
+    String url = miniHS2.getJdbcURL();
+    String user = System.getProperty("user.name");
+    String pwd = user;
+    String handleId = UUID.randomUUID().toString();
+
+    InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+    // Get splits
+    JobConf job = new JobConf(conf);
+    job.set(LlapBaseInputFormat.URL_KEY, url);
+    job.set(LlapBaseInputFormat.USER_KEY, user);
+    job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+    job.set(LlapBaseInputFormat.QUERY_KEY, query);
+    job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+    if (currentDatabase != null) {
+      job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
+    }
+
+    InputSplit[] splits = inputFormat.getSplits(job, numSplits);
+    assertTrue(splits.length > 0);
+
+    // Fetch rows from splits
+    boolean first = true;
+    int rowCount = 0;
+    for (InputSplit split : splits) {
+      System.out.println("Processing split " + split.getLocations());
+
+      int numColumns = 2;
+      RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
+      Row row = reader.createValue();
+      while (reader.next(NullWritable.get(), row)) {
+        rowProcessor.process(row);
+        ++rowCount;
+      }
+      reader.close();
+    }
+    LlapBaseInputFormat.close(handleId);
+
+    return rowCount;
+  }
+
+  /**
+   * Test CLI kill command of a query that is running.
+   * We spawn 2 threads - one running the query and
+   * the other attempting to cancel.
+   * We're using a dummy udf to simulate a query,
+   * that runs for a sufficiently long time.
+   * @throws Exception
+   */
+  @Test
+  public void testKillQuery() throws Exception {
+    String tableName = "testtab1";
+    createTestTable(tableName);
+    Connection con = hs2Conn;
+    Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+
+    String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
+    Statement stmt1 = con.createStatement();
+    Statement stmt2 = con2.createStatement();
+    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
+    stmt1.close();
+    final Statement stmt = con.createStatement();
+
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+
+    // Thread executing the query
+    Thread tExecute = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          System.out.println("Executing query: ");
+          // The test table has 500 rows, so total query time should be ~ 500*500ms
+          stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " +
+              "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col");
+          fail("Expecting SQLException");
+        } catch (SQLException e) {
+          tExecuteHolder.throwable = e;
+        }
+      }
+    });
+    // Thread killing the query
+    Thread tKill = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(2000);
+          String queryId = ((HiveStatement) stmt).getQueryId();
+          System.out.println("Killing query: " + queryId);
+
+          stmt2.execute("kill query '" + queryId + "'");
+          stmt2.close();
+        } catch (Exception e) {
+          tKillHolder.throwable = e;
+        }
+      }
+    });
+
+    tExecute.start();
+    tKill.start();
+    tExecute.join();
+    tKill.join();
+    stmt.close();
+    con2.close();
+
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  private static class ExceptionHolder {
+    Throwable throwable;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
deleted file mode 100644
index 7e35fef..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.jdbc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.LlapRowRecordReader;
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowInputFormat;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import org.datanucleus.ClassLoaderResolver;
-import org.datanucleus.NucleusContext;
-import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
-import org.datanucleus.AbstractNucleusContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestJdbcWithMiniLlap {
-  private static MiniHS2 miniHS2 = null;
-  private static String dataFileDir;
-  private static Path kvDataFilePath;
-  private static Path dataTypesFilePath;
-
-  private static HiveConf conf = null;
-  private Connection hs2Conn = null;
-
-  @BeforeClass
-  public static void beforeTest() throws Exception {
-    Class.forName(MiniHS2.getJdbcDriverName());
-
-    String confDir = "../../data/conf/llap/";
-    if (confDir != null && !confDir.isEmpty()) {
-      HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
-      System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
-    }
-
-    conf = new HiveConf();
-    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-
-    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
-        + "/tez-site.xml"));
-
-    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
-    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
-    kvDataFilePath = new Path(dataFileDir, "kv1.txt");
-    dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
-    Map<String, String> confOverlay = new HashMap<String, String>();
-    miniHS2.start(confOverlay);
-    miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
-  }
-
-  public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException {
-    Connection conn = DriverManager.getConnection(jdbcURL, user, pwd);
-    conn.createStatement().execute("set hive.support.concurrency = false");
-    return conn;
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    LlapBaseInputFormat.closeAll();
-    hs2Conn.close();
-  }
-
-  @AfterClass
-  public static void afterTest() throws Exception {
-    if (miniHS2.isStarted()) {
-      miniHS2.stop();
-    }
-  }
-
-  private void createTestTable(String tableName) throws Exception {
-    createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
-  }
-
-  public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws
-    Exception {
-    Statement stmt = connection.createStatement();
-
-    if (database != null) {
-      stmt.execute("CREATE DATABASE IF NOT EXISTS " + database);
-      stmt.execute("USE " + database);
-    }
-
-    // create table
-    stmt.execute("DROP TABLE IF EXISTS " + tableName);
-    stmt.execute("CREATE TABLE " + tableName
-        + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
-
-    // load data
-    stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName);
-
-    ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
-    assertTrue(res.next());
-    assertEquals("val_238", res.getString(2));
-    res.close();
-    stmt.close();
-  }
-
-  private void createDataTypesTable(String tableName) throws Exception {
-    Statement stmt = hs2Conn.createStatement();
-
-    // create table
-    stmt.execute("DROP TABLE IF EXISTS " + tableName);
-    // tables with various types
-    stmt.execute("create table " + tableName
-        + " (c1 int, c2 boolean, c3 double, c4 string,"
-        + " c5 array<int>, c6 map<int,string>, c7 map<string,string>,"
-        + " c8 struct<r:string,s:int,t:double>,"
-        + " c9 tinyint, c10 smallint, c11 float, c12 bigint,"
-        + " c13 array<array<string>>,"
-        + " c14 map<int, map<int,int>>,"
-        + " c15 struct<r:int,s:struct<a:int,b:string>>,"
-        + " c16 array<struct<m:map<string,string>,n:int>>,"
-        + " c17 timestamp, "
-        + " c18 decimal(16,7), "
-        + " c19 binary, "
-        + " c20 date,"
-        + " c21 varchar(20),"
-        + " c22 char(15),"
-        + " c23 binary"
-        + ")");
-    stmt.execute("load data local inpath '"
-        + dataTypesFilePath.toString() + "' into table " + tableName);
-    stmt.close();
-  }
-
-  @Test(timeout = 60000)
-  public void testLlapInputFormatEndToEnd() throws Exception {
-    createTestTable("testtab1");
-
-    int rowCount;
-
-    RowCollector rowCollector = new RowCollector();
-    String query = "select * from testtab1 where under_col = 0";
-    rowCount = processQuery(query, 1, rowCollector);
-    assertEquals(3, rowCount);
-    assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0));
-    assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1));
-    assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2));
-
-    // Try empty rows query
-    rowCollector.rows.clear();
-    query = "select * from testtab1 where true = false";
-    rowCount = processQuery(query, 1, rowCollector);
-    assertEquals(0, rowCount);
-  }
-
-  @Test(timeout = 60000)
-  public void testNonAsciiStrings() throws Exception {
-    createTestTable(hs2Conn, "nonascii", "testtab_nonascii", kvDataFilePath.toString());
-
-    RowCollector rowCollector = new RowCollector();
-    String nonAscii = "À côté du garçon";
-    String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0";
-    int rowCount = processQuery("nonascii", query, 1, rowCollector);
-    assertEquals(3, rowCount);
-
-    assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0));
-    assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1));
-    assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2));
-  }
-
-  @Test(timeout = 60000)
-  public void testEscapedStrings() throws Exception {
-    createTestTable("testtab1");
-
-    RowCollector rowCollector = new RowCollector();
-    String expectedVal1 = "'a',\"b\",\\c\\";
-    String expectedVal2 = "multi\nline";
-    String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0";
-    int rowCount = processQuery(query, 1, rowCollector);
-    assertEquals(3, rowCount);
-
-    assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0));
-    assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1));
-    assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2));
-  }
-
-  @Test(timeout = 60000)
-  public void testDataTypes() throws Exception {
-    createDataTypesTable("datatypes");
-    RowCollector2 rowCollector = new RowCollector2();
-    String query = "select * from datatypes";
-    int rowCount = processQuery(query, 1, rowCollector);
-    assertEquals(3, rowCount);
-
-    // Verify schema
-    String[][] colNameTypes = new String[][] {
-        {"datatypes.c1", "int"},
-        {"datatypes.c2", "boolean"},
-        {"datatypes.c3", "double"},
-        {"datatypes.c4", "string"},
-        {"datatypes.c5", "array<int>"},
-        {"datatypes.c6", "map<int,string>"},
-        {"datatypes.c7", "map<string,string>"},
-        {"datatypes.c8", "struct<r:string,s:int,t:double>"},
-        {"datatypes.c9", "tinyint"},
-        {"datatypes.c10", "smallint"},
-        {"datatypes.c11", "float"},
-        {"datatypes.c12", "bigint"},
-        {"datatypes.c13", "array<array<string>>"},
-        {"datatypes.c14", "map<int,map<int,int>>"},
-        {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
-        {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
-        {"datatypes.c17", "timestamp"},
-        {"datatypes.c18", "decimal(16,7)"},
-        {"datatypes.c19", "binary"},
-        {"datatypes.c20", "date"},
-        {"datatypes.c21", "varchar(20)"},
-        {"datatypes.c22", "char(15)"},
-        {"datatypes.c23", "binary"},
-    };
-    FieldDesc fieldDesc;
-    assertEquals(23, rowCollector.numColumns);
-    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
-      fieldDesc = rowCollector.schema.getColumns().get(idx);
-      assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
-      assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
-    }
-
-    // First row is all nulls
-    Object[] rowValues = rowCollector.rows.get(0);
-    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
-      assertEquals("idx=" + idx, null, rowValues[idx]);
-    }
-
-    // Second Row
-    rowValues = rowCollector.rows.get(1);
-    assertEquals(Integer.valueOf(-1), rowValues[0]);
-    assertEquals(Boolean.FALSE, rowValues[1]);
-    assertEquals(Double.valueOf(-1.1d), rowValues[2]);
-    assertEquals("", rowValues[3]);
-
-    List<?> c5Value = (List<?>) rowValues[4];
-    assertEquals(0, c5Value.size());
-
-    Map<?,?> c6Value = (Map<?,?>) rowValues[5];
-    assertEquals(0, c6Value.size());
-
-    Map<?,?> c7Value = (Map<?,?>) rowValues[6];
-    assertEquals(0, c7Value.size());
-
-    List<?> c8Value = (List<?>) rowValues[7];
-    assertEquals(null, c8Value.get(0));
-    assertEquals(null, c8Value.get(1));
-    assertEquals(null, c8Value.get(2));
-
-    assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
-    assertEquals(Short.valueOf((short) -1), rowValues[9]);
-    assertEquals(Float.valueOf(-1.0f), rowValues[10]);
-    assertEquals(Long.valueOf(-1l), rowValues[11]);
-
-    List<?> c13Value = (List<?>) rowValues[12];
-    assertEquals(0, c13Value.size());
-
-    Map<?,?> c14Value = (Map<?,?>) rowValues[13];
-    assertEquals(0, c14Value.size());
-
-    List<?> c15Value = (List<?>) rowValues[14];
-    assertEquals(null, c15Value.get(0));
-    assertEquals(null, c15Value.get(1));
-
-    List<?> c16Value = (List<?>) rowValues[15];
-    assertEquals(0, c16Value.size());
-
-    assertEquals(null, rowValues[16]);
-    assertEquals(null, rowValues[17]);
-    assertEquals(null, rowValues[18]);
-    assertEquals(null, rowValues[19]);
-    assertEquals(null, rowValues[20]);
-    assertEquals(null, rowValues[21]);
-    assertEquals(null, rowValues[22]);
-
-    // Third row
-    rowValues = rowCollector.rows.get(2);
-    assertEquals(Integer.valueOf(1), rowValues[0]);
-    assertEquals(Boolean.TRUE, rowValues[1]);
-    assertEquals(Double.valueOf(1.1d), rowValues[2]);
-    assertEquals("1", rowValues[3]);
-
-    c5Value = (List<?>) rowValues[4];
-    assertEquals(2, c5Value.size());
-    assertEquals(Integer.valueOf(1), c5Value.get(0));
-    assertEquals(Integer.valueOf(2), c5Value.get(1));
-
-    c6Value = (Map<?,?>) rowValues[5];
-    assertEquals(2, c6Value.size());
-    assertEquals("x", c6Value.get(Integer.valueOf(1)));
-    assertEquals("y", c6Value.get(Integer.valueOf(2)));
-
-    c7Value = (Map<?,?>) rowValues[6];
-    assertEquals(1, c7Value.size());
-    assertEquals("v", c7Value.get("k"));
-
-    c8Value = (List<?>) rowValues[7];
-    assertEquals("a", c8Value.get(0));
-    assertEquals(Integer.valueOf(9), c8Value.get(1));
-    assertEquals(Double.valueOf(2.2d), c8Value.get(2));
-
-    assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
-    assertEquals(Short.valueOf((short) 1), rowValues[9]);
-    assertEquals(Float.valueOf(1.0f), rowValues[10]);
-    assertEquals(Long.valueOf(1l), rowValues[11]);
-
-    c13Value = (List<?>) rowValues[12];
-    assertEquals(2, c13Value.size());
-    List<?> listVal = (List<?>) c13Value.get(0);
-    assertEquals("a", listVal.get(0));
-    assertEquals("b", listVal.get(1));
-    listVal = (List<?>) c13Value.get(1);
-    assertEquals("c", listVal.get(0));
-    assertEquals("d", listVal.get(1));
-
-    c14Value = (Map<?,?>) rowValues[13];
-    assertEquals(2, c14Value.size());
-    Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
-    assertEquals(2, mapVal.size());
-    assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
-    assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
-    mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
-    assertEquals(1, mapVal.size());
-    assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
-
-    c15Value = (List<?>) rowValues[14];
-    assertEquals(Integer.valueOf(1), c15Value.get(0));
-    listVal = (List<?>) c15Value.get(1);
-    assertEquals(2, listVal.size());
-    assertEquals(Integer.valueOf(2), listVal.get(0));
-    assertEquals("x", listVal.get(1));
-
-    c16Value = (List<?>) rowValues[15];
-    assertEquals(2, c16Value.size());
-    listVal = (List<?>) c16Value.get(0);
-    assertEquals(2, listVal.size());
-    mapVal = (Map<?,?>) listVal.get(0);
-    assertEquals(0, mapVal.size());
-    assertEquals(Integer.valueOf(1), listVal.get(1));
-    listVal = (List<?>) c16Value.get(1);
-    mapVal = (Map<?,?>) listVal.get(0);
-    assertEquals(2, mapVal.size());
-    assertEquals("b", mapVal.get("a"));
-    assertEquals("d", mapVal.get("c"));
-    assertEquals(Integer.valueOf(2), listVal.get(1));
-
-    assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
-    assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
-    assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
-    assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
-    assertEquals("abc123", rowValues[20]);
-    assertEquals("abc123         ", rowValues[21]);
-    assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
-  }
-
-
-  @Test(timeout = 60000)
-  public void testComplexQuery() throws Exception {
-    createTestTable("testtab1");
-
-    RowCollector rowCollector = new RowCollector();
-    String query = "select value, count(*) from testtab1 where under_col=0 group by value";
-    int rowCount = processQuery(query, 1, rowCollector);
-    assertEquals(1, rowCount);
-
-    assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0));
-  }
-
-  private interface RowProcessor {
-    void process(Row row);
-  }
-
-  private static class RowCollector implements RowProcessor {
-    ArrayList<String[]> rows = new ArrayList<String[]>();
-    Schema schema = null;
-    int numColumns = 0;
-
-    public void process(Row row) {
-      if (schema == null) {
-        schema = row.getSchema();
-        numColumns = schema.getColumns().size();
-      }
-
-      String[] arr = new String[numColumns];
-      for (int idx = 0; idx < numColumns; ++idx) {
-        Object val = row.getValue(idx);
-        arr[idx] = (val == null ? null : val.toString());
-      }
-      rows.add(arr);
-    }
-  }
-
-  // Save the actual values from each row as opposed to the String representation.
-  private static class RowCollector2 implements RowProcessor {
-    ArrayList<Object[]> rows = new ArrayList<Object[]>();
-    Schema schema = null;
-    int numColumns = 0;
-
-    public void process(Row row) {
-      if (schema == null) {
-        schema = row.getSchema();
-        numColumns = schema.getColumns().size();
-      }
-
-      Object[] arr = new Object[numColumns];
-      for (int idx = 0; idx < numColumns; ++idx) {
-        arr[idx] = row.getValue(idx);
-      }
-      rows.add(arr);
-    }
-  }
-
-  private int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception {
-    return processQuery(null, query, numSplits, rowProcessor);
-  }
-
-  private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception {
-    String url = miniHS2.getJdbcURL();
-    String user = System.getProperty("user.name");
-    String pwd = user;
-    String handleId = UUID.randomUUID().toString();
-
-    LlapRowInputFormat inputFormat = new LlapRowInputFormat();
-
-    // Get splits
-    JobConf job = new JobConf(conf);
-    job.set(LlapBaseInputFormat.URL_KEY, url);
-    job.set(LlapBaseInputFormat.USER_KEY, user);
-    job.set(LlapBaseInputFormat.PWD_KEY, pwd);
-    job.set(LlapBaseInputFormat.QUERY_KEY, query);
-    job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
-    if (currentDatabase != null) {
-      job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
-    }
-
-    InputSplit[] splits = inputFormat.getSplits(job, numSplits);
-    assertTrue(splits.length > 0);
-
-    // Fetch rows from splits
-    boolean first = true;
-    int rowCount = 0;
-    for (InputSplit split : splits) {
-      System.out.println("Processing split " + split.getLocations());
-
-      int numColumns = 2;
-      RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
-      Row row = reader.createValue();
-      while (reader.next(NullWritable.get(), row)) {
-        rowProcessor.process(row);
-        ++rowCount;
-      }
-      reader.close();
-    }
-    LlapBaseInputFormat.close(handleId);
-
-    return rowCount;
-  }
-
-  /**
-   * Test CLI kill command of a query that is running.
-   * We spawn 2 threads - one running the query and
-   * the other attempting to cancel.
-   * We're using a dummy udf to simulate a query,
-   * that runs for a sufficiently long time.
-   * @throws Exception
-   */
-  @Test
-  public void testKillQuery() throws Exception {
-    String tableName = "testtab1";
-    createTestTable(tableName);
-    Connection con = hs2Conn;
-    Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
-
-    String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
-    Statement stmt1 = con.createStatement();
-    Statement stmt2 = con2.createStatement();
-    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
-    stmt1.close();
-    final Statement stmt = con.createStatement();
-
-    ExceptionHolder tExecuteHolder = new ExceptionHolder();
-    ExceptionHolder tKillHolder = new ExceptionHolder();
-
-    // Thread executing the query
-    Thread tExecute = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          System.out.println("Executing query: ");
-          // The test table has 500 rows, so total query time should be ~ 500*500ms
-          stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " +
-              "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col");
-          fail("Expecting SQLException");
-        } catch (SQLException e) {
-          tExecuteHolder.throwable = e;
-        }
-      }
-    });
-    // Thread killing the query
-    Thread tKill = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(2000);
-          String queryId = ((HiveStatement) stmt).getQueryId();
-          System.out.println("Killing query: " + queryId);
-
-          stmt2.execute("kill query '" + queryId + "'");
-          stmt2.close();
-        } catch (Exception e) {
-          tKillHolder.throwable = e;
-        }
-      }
-    });
-
-    tExecute.start();
-    tKill.start();
-    tExecute.join();
-    tKill.join();
-    stmt.close();
-    con2.close();
-
-    assertNotNull("tExecute", tExecuteHolder.throwable);
-    assertNull("tCancel", tKillHolder.throwable);
-  }
-
-  private static class ExceptionHolder {
-    Throwable throwable;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
new file mode 100644
index 0000000..afb9837
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for Arrow format
+ */
+public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    BaseJdbcWithMiniLlap.beforeTest(true);
+  }
+
+  @Override
+  protected InputFormat<NullWritable, Row> getInputFormat() {
+    //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE
+    return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+  }
+
+  // Currently MAP type is not supported. Add it back when Arrow 1.0 is released.
+  // See: SPARK-21187
+  @Override
+  public void testDataTypes() throws Exception {
+    createDataTypesTable("datatypes");
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from datatypes";
+    int rowCount = processQuery(query, 1, rowCollector);
+    assertEquals(3, rowCount);
+
+    // Verify schema
+    String[][] colNameTypes = new String[][] {
+        {"datatypes.c1", "int"},
+        {"datatypes.c2", "boolean"},
+        {"datatypes.c3", "double"},
+        {"datatypes.c4", "string"},
+        {"datatypes.c5", "array<int>"},
+        {"datatypes.c6", "map<int,string>"},
+        {"datatypes.c7", "map<string,string>"},
+        {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+        {"datatypes.c9", "tinyint"},
+        {"datatypes.c10", "smallint"},
+        {"datatypes.c11", "float"},
+        {"datatypes.c12", "bigint"},
+        {"datatypes.c13", "array<array<string>>"},
+        {"datatypes.c14", "map<int,map<int,int>>"},
+        {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+        {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+        {"datatypes.c17", "timestamp"},
+        {"datatypes.c18", "decimal(16,7)"},
+        {"datatypes.c19", "binary"},
+        {"datatypes.c20", "date"},
+        {"datatypes.c21", "varchar(20)"},
+        {"datatypes.c22", "char(15)"},
+        {"datatypes.c23", "binary"},
+    };
+    FieldDesc fieldDesc;
+    assertEquals(23, rowCollector.numColumns);
+    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+      fieldDesc = rowCollector.schema.getColumns().get(idx);
+      assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+      assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+    }
+
+    // First row is all nulls
+    Object[] rowValues = rowCollector.rows.get(0);
+    for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+      assertEquals("idx=" + idx, null, rowValues[idx]);
+    }
+
+    // Second Row
+    rowValues = rowCollector.rows.get(1);
+    assertEquals(Integer.valueOf(-1), rowValues[0]);
+    assertEquals(Boolean.FALSE, rowValues[1]);
+    assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+    assertEquals("", rowValues[3]);
+
+    List<?> c5Value = (List<?>) rowValues[4];
+    assertEquals(0, c5Value.size());
+
+    //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+    //assertEquals(0, c6Value.size());
+
+    //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+    //assertEquals(0, c7Value.size());
+
+    List<?> c8Value = (List<?>) rowValues[7];
+    assertEquals(null, c8Value.get(0));
+    assertEquals(null, c8Value.get(1));
+    assertEquals(null, c8Value.get(2));
+
+    assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+    assertEquals(Short.valueOf((short) -1), rowValues[9]);
+    assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+    assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+    List<?> c13Value = (List<?>) rowValues[12];
+    assertEquals(0, c13Value.size());
+
+    //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+    //assertEquals(0, c14Value.size());
+
+    List<?> c15Value = (List<?>) rowValues[14];
+    assertEquals(null, c15Value.get(0));
+    assertEquals(null, c15Value.get(1));
+
+    //List<?> c16Value = (List<?>) rowValues[15];
+    //assertEquals(0, c16Value.size());
+
+    assertEquals(null, rowValues[16]);
+    assertEquals(null, rowValues[17]);
+    assertEquals(null, rowValues[18]);
+    assertEquals(null, rowValues[19]);
+    assertEquals(null, rowValues[20]);
+    assertEquals(null, rowValues[21]);
+    assertEquals(null, rowValues[22]);
+
+    // Third row
+    rowValues = rowCollector.rows.get(2);
+    assertEquals(Integer.valueOf(1), rowValues[0]);
+    assertEquals(Boolean.TRUE, rowValues[1]);
+    assertEquals(Double.valueOf(1.1d), rowValues[2]);
+    assertEquals("1", rowValues[3]);
+
+    c5Value = (List<?>) rowValues[4];
+    assertEquals(2, c5Value.size());
+    assertEquals(Integer.valueOf(1), c5Value.get(0));
+    assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+    //c6Value = (Map<?,?>) rowValues[5];
+    //assertEquals(2, c6Value.size());
+    //assertEquals("x", c6Value.get(Integer.valueOf(1)));
+    //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+    //c7Value = (Map<?,?>) rowValues[6];
+    //assertEquals(1, c7Value.size());
+    //assertEquals("v", c7Value.get("k"));
+
+    c8Value = (List<?>) rowValues[7];
+    assertEquals("a", c8Value.get(0));
+    assertEquals(Integer.valueOf(9), c8Value.get(1));
+    assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+    assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+    assertEquals(Short.valueOf((short) 1), rowValues[9]);
+    assertEquals(Float.valueOf(1.0f), rowValues[10]);
+    assertEquals(Long.valueOf(1l), rowValues[11]);
+
+    c13Value = (List<?>) rowValues[12];
+    assertEquals(2, c13Value.size());
+    List<?> listVal = (List<?>) c13Value.get(0);
+    assertEquals("a", listVal.get(0));
+    assertEquals("b", listVal.get(1));
+    listVal = (List<?>) c13Value.get(1);
+    assertEquals("c", listVal.get(0));
+    assertEquals("d", listVal.get(1));
+
+    //c14Value = (Map<?,?>) rowValues[13];
+    //assertEquals(2, c14Value.size());
+    //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+    //assertEquals(2, mapVal.size());
+    //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+    //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+    //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+    //assertEquals(1, mapVal.size());
+    //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+    c15Value = (List<?>) rowValues[14];
+    assertEquals(Integer.valueOf(1), c15Value.get(0));
+    listVal = (List<?>) c15Value.get(1);
+    assertEquals(2, listVal.size());
+    assertEquals(Integer.valueOf(2), listVal.get(0));
+    assertEquals("x", listVal.get(1));
+
+    //c16Value = (List<?>) rowValues[15];
+    //assertEquals(2, c16Value.size());
+    //listVal = (List<?>) c16Value.get(0);
+    //assertEquals(2, listVal.size());
+    //mapVal = (Map<?,?>) listVal.get(0);
+    //assertEquals(0, mapVal.size());
+    //assertEquals(Integer.valueOf(1), listVal.get(1));
+    //listVal = (List<?>) c16Value.get(1);
+    //mapVal = (Map<?,?>) listVal.get(0);
+    //assertEquals(2, mapVal.size());
+    //assertEquals("b", mapVal.get("a"));
+    //assertEquals("d", mapVal.get("c"));
+    //assertEquals(Integer.valueOf(2), listVal.get(1));
+
+    assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
+    assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+    assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+    assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+    assertEquals("abc123", rowValues[20]);
+    assertEquals("abc123         ", rowValues[21]);
+    assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
new file mode 100644
index 0000000..809068f
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.hive.llap.LlapRowInputFormat;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.After;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for llap Row format.
+ */
+public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap {
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    BaseJdbcWithMiniLlap.beforeTest(false);
+  }
+
+  @Override
+  protected InputFormat<NullWritable, Row> getInputFormat() {
+    return new LlapRowInputFormat();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index a9ed3d2..5316aa7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -22,25 +22,15 @@ import com.google.common.base.Preconditions;
 
 import java.io.BufferedInputStream;
 import java.io.Closeable;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.DataInputStream;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Schema;
 import org.apache.hadoop.hive.llap.io.ChunkedInputStream;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,52 +139,61 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
           throw new IOException("Hit end of input, but did not find expected end of data indicator");
         }
 
-        // There should be a reader event available, or coming soon, so okay to be blocking call.
-        ReaderEvent event = getReaderEvent();
-        switch (event.getEventType()) {
-          case DONE:
-            break;
-          default:
-            throw new IOException("Expected reader event with done status, but got "
-                + event.getEventType() + " with message " + event.getMessage());
-        }
+        processReaderEvent();
         return false;
       }
     } catch (IOException io) {
-      try {
-        if (Thread.interrupted()) {
-          // Either we were interrupted by one of:
-          // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
-          // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
-          // Either way we should not try to block trying to read the reader events queue.
-          if (readerEvents.isEmpty()) {
-            // Case 2.
-            throw io;
-          } else {
-            // Case 1. Fail the reader, sending back the error we received from the reader event.
-            ReaderEvent event = getReaderEvent();
-            switch (event.getEventType()) {
-              case ERROR:
-                throw new IOException("Received reader event error: " + event.getMessage(), io);
-              default:
-                throw new IOException("Got reader event type " + event.getEventType()
-                    + ", expected error event", io);
-            }
-          }
-        } else {
-          // If we weren't interrupted, just propagate the error
+      failOnInterruption(io);
+      return false;
+    }
+  }
+
+  protected void processReaderEvent() throws IOException {
+    // There should be a reader event available, or coming soon, so okay to be blocking call.
+    ReaderEvent event = getReaderEvent();
+    switch (event.getEventType()) {
+      case DONE:
+        break;
+      default:
+        throw new IOException("Expected reader event with done status, but got "
+            + event.getEventType() + " with message " + event.getMessage());
+    }
+  }
+
+  protected void failOnInterruption(IOException io) throws IOException {
+    try {
+      if (Thread.interrupted()) {
+        // Either we were interrupted by one of:
+        // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
+        // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+        // Either way we should not try to block trying to read the reader events queue.
+        if (readerEvents.isEmpty()) {
+          // Case 2.
           throw io;
+        } else {
+          // Case 1. Fail the reader, sending back the error we received from the reader event.
+          ReaderEvent event = getReaderEvent();
+          switch (event.getEventType()) {
+            case ERROR:
+              throw new IOException("Received reader event error: " + event.getMessage(), io);
+            default:
+              throw new IOException("Got reader event type " + event.getEventType()
+                  + ", expected error event", io);
+          }
         }
-      } finally {
-        // The external client handling umbilical responses and the connection to read the incoming
-        // data are not coupled. Calling close() here to make sure an error in one will cause the
-        // other to be closed as well.
-        try {
-          close();
-        } catch (Exception err) {
-          // Don't propagate errors from close() since this will lose the original error above.
-          LOG.error("Closing RecordReader due to error and hit another error during close()", err);
-        }
+      } else {
+        // If we weren't interrupted, just propagate the error
+        throw io;
+      }
+    } finally {
+      // The external client handling umbilical responses and the connection to read the incoming
+      // data are not coupled. Calling close() here to make sure an error in one will cause the
+      // other to be closed as well.
+      try {
+        close();
+      } catch (Exception err) {
+        // Don't propagate errors from close() since this will lose the original error above.
+        LOG.error("Closing RecordReader due to error and hit another error during close()", err);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
index 1cfbf3a..6cc1d17 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -70,20 +69,20 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
   private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
 
   protected final Configuration conf;
-  protected final RecordReader<NullWritable, BytesWritable> reader;
+  protected final RecordReader reader;
   protected final Schema schema;
   protected final AbstractSerDe serde;
-  protected final BytesWritable data;
+  protected final Writable data;
 
   public LlapRowRecordReader(Configuration conf, Schema schema,
-      RecordReader<NullWritable, BytesWritable> reader) throws IOException {
+      RecordReader<NullWritable, ? extends Writable> reader) throws IOException {
     this.conf = conf;
     this.schema = schema;
     this.reader = reader;
-    this.data = new BytesWritable();
+    this.data = reader.createValue();
 
     try {
-      serde = initSerDe(conf);
+      this.serde = initSerDe(conf);
     } catch (SerDeException err) {
       throw new IOException(err);
     }
@@ -118,7 +117,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
   public boolean next(NullWritable key, Row value) throws IOException {
     Preconditions.checkArgument(value != null);
 
-    boolean hasNext = reader.next(key,  data);
+    boolean hasNext = reader.next(key, data);
     if (hasNext) {
       // Deserialize data to column values, and populate the row record
       Object rowObj;
@@ -216,7 +215,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
     return convertedVal;
   }
 
-  static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) {
+  protected static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) {
     Schema structSchema = row.getSchema();
     // Add struct field data to the Row
     List<? extends StructField> structFields = soi.getAllStructFieldRefs();
@@ -230,6 +229,11 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
     }
   }
 
+  //Factory method for serDe
+  protected AbstractSerDe createSerDe() throws SerDeException {
+    return new LazyBinarySerDe();
+  }
+
   protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException {
     Properties props = new Properties();
     StringBuilder columnsBuffer = new StringBuilder();
@@ -249,9 +253,9 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
     props.put(serdeConstants.LIST_COLUMNS, columns);
     props.put(serdeConstants.LIST_COLUMN_TYPES, types);
     props.put(serdeConstants.ESCAPE_CHAR, "\\");
-    AbstractSerDe serde = new LazyBinarySerDe();
-    serde.initialize(conf, props);
+    AbstractSerDe createdSerDe = createSerDe();
+    createdSerDe.initialize(conf, props);
 
-    return serde;
+    return createdSerDe;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml
index ed4704b..295d3e6 100644
--- a/llap-ext-client/pom.xml
+++ b/llap-ext-client/pom.xml
@@ -41,6 +41,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
       <artifactId>hive-llap-client</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
new file mode 100644
index 0000000..d9c5666
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+/*
+ * Read from Arrow stream batch-by-batch
+ */
+public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrapperWritable> {
+
+  private BufferAllocator allocator;
+  private ArrowStreamReader arrowStreamReader;
+
+  public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
+      JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+    super(in, schema, clazz, job, client, socket);
+    allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+    this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException {
+    try {
+      // Need a way to know what thread to interrupt, since this is a blocking thread.
+      setReaderThread(Thread.currentThread());
+
+      boolean hasInput = arrowStreamReader.loadNextBatch();
+      if (hasInput) {
+        VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
+        //There must be at least one column vector
+        Preconditions.checkState(vectorSchemaRoot.getFieldVectors().size() > 0);
+        if(vectorSchemaRoot.getFieldVectors().get(0).getValueCount() == 0) {
+          //An empty batch will appear at the end of the stream
+          return false;
+        }
+        value.setVectorSchemaRoot(arrowStreamReader.getVectorSchemaRoot());
+        return true;
+      } else {
+        processReaderEvent();
+        return false;
+      }
+    } catch (IOException io) {
+      failOnInterruption(io);
+      return false;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    arrowStreamReader.close();
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
new file mode 100644
index 0000000..fafbdee
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+
+/*
+ * Adapts an Arrow batch reader to a row reader
+ */
+public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> {
+
+  private LlapBaseInputFormat baseInputFormat;
+
+  public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
+    baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return baseInputFormat.getSplits(job, numSplits);
+  }
+
+  @Override
+  public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+      throws IOException {
+    LlapInputSplit llapSplit = (LlapInputSplit) split;
+    LlapArrowBatchRecordReader reader =
+        (LlapArrowBatchRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter);
+    return new LlapArrowRowRecordReader(job, reader.getSchema(), reader);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
new file mode 100644
index 0000000..d4179d5
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Buffers a batch for reading one row at a time.
+ */
+public class LlapArrowRowRecordReader extends LlapRowRecordReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LlapArrowRowRecordReader.class);
+  private int rowIndex = 0;
+  private int batchSize = 0;
+
+  //Buffer one batch at a time, for row retrieval
+  private Object[][] currentBatch;
+
+  public LlapArrowRowRecordReader(Configuration conf, Schema schema,
+      RecordReader<NullWritable, ? extends Writable> reader) throws IOException {
+    super(conf, schema, reader);
+  }
+
+  @Override
+  public boolean next(NullWritable key, Row value) throws IOException {
+    Preconditions.checkArgument(value != null);
+    boolean hasNext = false;
+    ArrowWrapperWritable batchData = (ArrowWrapperWritable) data;
+    if((batchSize == 0) || (rowIndex == batchSize)) {
+      //This is either the first batch or we've used up the current batch buffer
+      batchSize = 0;
+      rowIndex = 0;
+      hasNext = reader.next(key, data);
+      if(hasNext) {
+        //There is another batch to buffer
+        try {
+          List<FieldVector> vectors = batchData.getVectorSchemaRoot().getFieldVectors();
+          //hasNext implies there is some column in the batch
+          Preconditions.checkState(vectors.size() > 0);
+          //All the vectors have the same length,
+          //we can get the number of rows from the first vector
+          batchSize = vectors.get(0).getValueCount();
+          ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot());
+          currentBatch = (Object[][]) serde.deserialize(wrapper);
+          StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
+          setRowFromStruct(value, currentBatch[rowIndex], rowOI);
+        } catch (Exception e) {
+          LOG.error("Failed to fetch Arrow batch", e);
+          throw new RuntimeException(e);
+        }
+      }
+      //There were no more batches AND
+      //this is either the first batch or we've used up the current batch buffer.
+      //goto return false
+    } else if(rowIndex < batchSize) {
+      //Take a row from the current buffered batch
+      hasNext = true;
+      StructObjectInspector rowOI = null;
+      try {
+        rowOI = (StructObjectInspector) serde.getObjectInspector();
+      } catch (SerDeException e) {
+        throw new RuntimeException(e);
+      }
+      setRowFromStruct(value, currentBatch[rowIndex], rowOI);
+    }
+    //Always inc the batch buffer index
+    //If we return false, it is just a noop
+    rowIndex++;
+    return hasNext;
+  }
+
+  protected AbstractSerDe createSerDe() throws SerDeException {
+    return new ArrowColumnarBatchSerDe();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index f4c7fa4..ef03be6 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -49,15 +49,15 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
 import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
 import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
 import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -104,6 +104,8 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
   private String user; // "hive",
   private String pwd;  // ""
   private String query;
+  private boolean useArrow;
+  private long arrowAllocatorLimit;
   private final Random rand = new Random();
 
   public static final String URL_KEY = "llap.if.hs2.connection";
@@ -123,7 +125,14 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
     this.query = query;
   }
 
-  public LlapBaseInputFormat() {}
+  public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
+    this.useArrow = useArrow;
+    this.arrowAllocatorLimit = arrowAllocatorLimit;
+  }
+
+  public LlapBaseInputFormat() {
+    this.useArrow = false;
+  }
 
 
   @SuppressWarnings("unchecked")
@@ -195,8 +204,16 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
     LOG.info("Registered id: " + fragmentId);
 
     @SuppressWarnings("rawtypes")
-    LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(),
-        llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket);
+    LlapBaseRecordReader recordReader;
+    if(useArrow) {
+      recordReader = new LlapArrowBatchRecordReader(
+          socket.getInputStream(), llapSplit.getSchema(),
+          ArrowWrapperWritable.class, job, llapClient, socket,
+          arrowAllocatorLimit);
+    } else {
+      recordReader = new LlapBaseRecordReader(socket.getInputStream(),
+          llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);
+    }
     umbilicalResponder.setRecordReader(recordReader);
     return recordReader;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index df10c74..9f64cbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
     <antlr.version>3.5.2</antlr.version>
     <apache-directory-server.version>1.5.6</apache-directory-server.version>
     <apache-directory-clientapi.version>0.1</apache-directory-clientapi.version>
+    <!-- Include arrow for LlapOutputFormatService -->
     <arrow.version>0.8.0</arrow.version>
     <avatica.version>1.11.0</avatica.version>
     <avro.version>1.7.7</avro.version>