You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2018/05/21 20:48:13 UTC
[1/2] hive git commit: HIVE-19308: Provide an Arrow stream reader for
external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 610748287 -> 2c78ceb6a
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
index df7b53f..dd490b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -15,26 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql.io.arrow;
import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-public class ArrowWrapperWritable implements Writable {
+public class ArrowWrapperWritable implements WritableComparable {
private VectorSchemaRoot vectorSchemaRoot;
public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
this.vectorSchemaRoot = vectorSchemaRoot;
}
+ public ArrowWrapperWritable() {}
public VectorSchemaRoot getVectorSchemaRoot() {
return vectorSchemaRoot;
}
+ public void setVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) {
+ this.vectorSchemaRoot = vectorSchemaRoot;
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
throw new UnsupportedOperationException();
@@ -44,4 +50,12 @@ public class ArrowWrapperWritable implements Writable {
public void readFields(DataInput dataInput) throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override public int compareTo(Object o) {
+ return 0;
+ }
+
+ @Override public boolean equals(Object o) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
index 78cc188..7aa732b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql.io.arrow;
import org.apache.arrow.memory.RootAllocator;
@@ -41,4 +42,12 @@ public enum RootAllocatorFactory {
}
return rootAllocator;
}
+
+ //arrowAllocatorLimit is ignored if an allocator was previously created
+ public synchronized RootAllocator getOrCreateRootAllocator(long arrowAllocatorLimit) {
+ if (rootAllocator == null) {
+ rootAllocator = new RootAllocator(arrowAllocatorLimit);
+ }
+ return rootAllocator;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 13a3070..f27cdf4 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -54,6 +54,7 @@ public class TestLlapOutputFormat {
Configuration conf = new Configuration();
// Pick random avail port
HiveConf.setIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
LlapOutputFormatService.initializeAndStart(conf, null);
service = LlapOutputFormatService.get();
LlapProxy.setDaemon(true);
[2/2] hive git commit: HIVE-19308: Provide an Arrow stream reader for
external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Posted by jd...@apache.org.
HIVE-19308: Provide an Arrow stream reader for external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2c78ceb6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2c78ceb6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2c78ceb6
Branch: refs/heads/master
Commit: 2c78ceb6abdc15585bf911633c2c01271b06da9b
Parents: 6107482
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon May 21 13:47:43 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon May 21 13:47:43 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/jdbc/AbstractJdbcTriggersTest.java | 4 +-
.../apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 628 +++++++++++++++++++
.../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 616 ------------------
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 230 +++++++
.../hive/jdbc/TestJdbcWithMiniLlapRow.java | 45 ++
.../hadoop/hive/llap/LlapBaseRecordReader.java | 101 ++-
.../hadoop/hive/llap/LlapRowRecordReader.java | 26 +-
llap-ext-client/pom.xml | 5 +
.../hive/llap/LlapArrowBatchRecordReader.java | 82 +++
.../hive/llap/LlapArrowRowInputFormat.java | 53 ++
.../hive/llap/LlapArrowRowRecordReader.java | 107 ++++
.../hadoop/hive/llap/LlapBaseInputFormat.java | 27 +-
pom.xml | 1 +
.../hive/ql/io/arrow/ArrowWrapperWritable.java | 18 +-
.../hive/ql/io/arrow/RootAllocatorFactory.java | 9 +
.../hadoop/hive/llap/TestLlapOutputFormat.java | 1 +
17 files changed, 1267 insertions(+), 688 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b12a7a4..c14caf6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4166,7 +4166,7 @@ public class HiveConf extends Configuration {
Constants.LLAP_LOGGER_NAME_RFA,
Constants.LLAP_LOGGER_NAME_CONSOLE),
"logger used for llap-daemons."),
- LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false,
+ LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true,
"Whether LLapOutputFormatService should output arrow batches"),
HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 17e44bb..7d5172b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -90,7 +90,7 @@ public abstract class AbstractJdbcTriggersTest {
@Before
public void setUp() throws Exception {
- hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
}
@After
@@ -124,7 +124,7 @@ public abstract class AbstractJdbcTriggersTest {
throws Exception {
Connection con = hs2Conn;
- TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
+ BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
createSleepUDF();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
new file mode 100644
index 0000000..7a891ef
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.NucleusContext;
+import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
+import org.datanucleus.AbstractNucleusContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * Specialize this base class for different serde's/formats
+ * {@link #beforeTest(boolean) beforeTest} should be called
+ * by sub-classes in a {@link org.junit.BeforeClass} initializer
+ */
+public abstract class BaseJdbcWithMiniLlap {
+ private static MiniHS2 miniHS2 = null;
+ private static String dataFileDir;
+ private static Path kvDataFilePath;
+ private static Path dataTypesFilePath;
+
+ private static HiveConf conf = null;
+ private static Connection hs2Conn = null;
+
+ // This method should be called by sub-classes in a @BeforeClass initializer
+ public static void beforeTest(boolean useArrow) throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+
+ String confDir = "../../data/conf/llap/";
+ if (confDir != null && !confDir.isEmpty()) {
+ HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
+ }
+
+ conf = new HiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ if(useArrow) {
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+ } else {
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+ }
+
+ conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ + "/tez-site.xml"));
+
+ miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+
+ dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+ dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+ miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ }
+
+ public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException {
+ Connection conn = DriverManager.getConnection(jdbcURL, user, pwd);
+ conn.createStatement().execute("set hive.support.concurrency = false");
+ return conn;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LlapBaseInputFormat.closeAll();
+ hs2Conn.close();
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ private void createTestTable(String tableName) throws Exception {
+ createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
+ }
+
+ public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws
+ Exception {
+ Statement stmt = connection.createStatement();
+
+ if (database != null) {
+ stmt.execute("CREATE DATABASE IF NOT EXISTS " + database);
+ stmt.execute("USE " + database);
+ }
+
+ // create table
+ stmt.execute("DROP TABLE IF EXISTS " + tableName);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
+
+ // load data
+ stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName);
+
+ ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
+ assertTrue(res.next());
+ assertEquals("val_238", res.getString(2));
+ res.close();
+ stmt.close();
+ }
+
+ protected void createDataTypesTable(String tableName) throws Exception {
+ Statement stmt = hs2Conn.createStatement();
+
+ // create table
+ stmt.execute("DROP TABLE IF EXISTS " + tableName);
+ // tables with various types
+ stmt.execute("create table " + tableName
+ + " (c1 int, c2 boolean, c3 double, c4 string,"
+ + " c5 array<int>, c6 map<int,string>, c7 map<string,string>,"
+ + " c8 struct<r:string,s:int,t:double>,"
+ + " c9 tinyint, c10 smallint, c11 float, c12 bigint,"
+ + " c13 array<array<string>>,"
+ + " c14 map<int, map<int,int>>,"
+ + " c15 struct<r:int,s:struct<a:int,b:string>>,"
+ + " c16 array<struct<m:map<string,string>,n:int>>,"
+ + " c17 timestamp, "
+ + " c18 decimal(16,7), "
+ + " c19 binary, "
+ + " c20 date,"
+ + " c21 varchar(20),"
+ + " c22 char(15),"
+ + " c23 binary"
+ + ")");
+ stmt.execute("load data local inpath '"
+ + dataTypesFilePath.toString() + "' into table " + tableName);
+ stmt.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLlapInputFormatEndToEnd() throws Exception {
+ createTestTable("testtab1");
+
+ int rowCount;
+
+ RowCollector rowCollector = new RowCollector();
+ String query = "select * from testtab1 where under_col = 0";
+ rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+ assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0));
+ assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1));
+ assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2));
+
+ // Try empty rows query
+ rowCollector.rows.clear();
+ query = "select * from testtab1 where true = false";
+ rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(0, rowCount);
+ }
+
+ @Test(timeout = 60000)
+ public void testNonAsciiStrings() throws Exception {
+ createTestTable("testtab_nonascii");
+
+ RowCollector rowCollector = new RowCollector();
+ String nonAscii = "À côté du garçon";
+ String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0));
+ assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1));
+ assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2));
+ }
+
+ @Test(timeout = 60000)
+ public void testEscapedStrings() throws Exception {
+ createTestTable("testtab1");
+
+ RowCollector rowCollector = new RowCollector();
+ String expectedVal1 = "'a',\"b\",\\c\\";
+ String expectedVal2 = "multi\nline";
+ String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0));
+ assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1));
+ assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2));
+ }
+
+ @Test(timeout = 60000)
+ public void testDataTypes() throws Exception {
+ createDataTypesTable("datatypes");
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from datatypes";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ // Verify schema
+ String[][] colNameTypes = new String[][] {
+ {"datatypes.c1", "int"},
+ {"datatypes.c2", "boolean"},
+ {"datatypes.c3", "double"},
+ {"datatypes.c4", "string"},
+ {"datatypes.c5", "array<int>"},
+ {"datatypes.c6", "map<int,string>"},
+ {"datatypes.c7", "map<string,string>"},
+ {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+ {"datatypes.c9", "tinyint"},
+ {"datatypes.c10", "smallint"},
+ {"datatypes.c11", "float"},
+ {"datatypes.c12", "bigint"},
+ {"datatypes.c13", "array<array<string>>"},
+ {"datatypes.c14", "map<int,map<int,int>>"},
+ {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+ {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+ {"datatypes.c17", "timestamp"},
+ {"datatypes.c18", "decimal(16,7)"},
+ {"datatypes.c19", "binary"},
+ {"datatypes.c20", "date"},
+ {"datatypes.c21", "varchar(20)"},
+ {"datatypes.c22", "char(15)"},
+ {"datatypes.c23", "binary"},
+ };
+ FieldDesc fieldDesc;
+ assertEquals(23, rowCollector.numColumns);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ fieldDesc = rowCollector.schema.getColumns().get(idx);
+ assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+ assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+ }
+
+ // First row is all nulls
+ Object[] rowValues = rowCollector.rows.get(0);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ assertEquals("idx=" + idx, null, rowValues[idx]);
+ }
+
+ // Second Row
+ rowValues = rowCollector.rows.get(1);
+ assertEquals(Integer.valueOf(-1), rowValues[0]);
+ assertEquals(Boolean.FALSE, rowValues[1]);
+ assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+ assertEquals("", rowValues[3]);
+
+ List<?> c5Value = (List<?>) rowValues[4];
+ assertEquals(0, c5Value.size());
+
+ Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(0, c6Value.size());
+
+ Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(0, c7Value.size());
+
+ List<?> c8Value = (List<?>) rowValues[7];
+ assertEquals(null, c8Value.get(0));
+ assertEquals(null, c8Value.get(1));
+ assertEquals(null, c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+ assertEquals(Short.valueOf((short) -1), rowValues[9]);
+ assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+ List<?> c13Value = (List<?>) rowValues[12];
+ assertEquals(0, c13Value.size());
+
+ Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(0, c14Value.size());
+
+ List<?> c15Value = (List<?>) rowValues[14];
+ assertEquals(null, c15Value.get(0));
+ assertEquals(null, c15Value.get(1));
+
+ List<?> c16Value = (List<?>) rowValues[15];
+ assertEquals(0, c16Value.size());
+
+ assertEquals(null, rowValues[16]);
+ assertEquals(null, rowValues[17]);
+ assertEquals(null, rowValues[18]);
+ assertEquals(null, rowValues[19]);
+ assertEquals(null, rowValues[20]);
+ assertEquals(null, rowValues[21]);
+ assertEquals(null, rowValues[22]);
+
+ // Third row
+ rowValues = rowCollector.rows.get(2);
+ assertEquals(Integer.valueOf(1), rowValues[0]);
+ assertEquals(Boolean.TRUE, rowValues[1]);
+ assertEquals(Double.valueOf(1.1d), rowValues[2]);
+ assertEquals("1", rowValues[3]);
+
+ c5Value = (List<?>) rowValues[4];
+ assertEquals(2, c5Value.size());
+ assertEquals(Integer.valueOf(1), c5Value.get(0));
+ assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+ c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(2, c6Value.size());
+ assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+ c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(1, c7Value.size());
+ assertEquals("v", c7Value.get("k"));
+
+ c8Value = (List<?>) rowValues[7];
+ assertEquals("a", c8Value.get(0));
+ assertEquals(Integer.valueOf(9), c8Value.get(1));
+ assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+ assertEquals(Short.valueOf((short) 1), rowValues[9]);
+ assertEquals(Float.valueOf(1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(1l), rowValues[11]);
+
+ c13Value = (List<?>) rowValues[12];
+ assertEquals(2, c13Value.size());
+ List<?> listVal = (List<?>) c13Value.get(0);
+ assertEquals("a", listVal.get(0));
+ assertEquals("b", listVal.get(1));
+ listVal = (List<?>) c13Value.get(1);
+ assertEquals("c", listVal.get(0));
+ assertEquals("d", listVal.get(1));
+
+ c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(2, c14Value.size());
+ Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(2, mapVal.size());
+ assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ assertEquals(1, mapVal.size());
+ assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+ c15Value = (List<?>) rowValues[14];
+ assertEquals(Integer.valueOf(1), c15Value.get(0));
+ listVal = (List<?>) c15Value.get(1);
+ assertEquals(2, listVal.size());
+ assertEquals(Integer.valueOf(2), listVal.get(0));
+ assertEquals("x", listVal.get(1));
+
+ c16Value = (List<?>) rowValues[15];
+ assertEquals(2, c16Value.size());
+ listVal = (List<?>) c16Value.get(0);
+ assertEquals(2, listVal.size());
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(0, mapVal.size());
+ assertEquals(Integer.valueOf(1), listVal.get(1));
+ listVal = (List<?>) c16Value.get(1);
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(2, mapVal.size());
+ assertEquals("b", mapVal.get("a"));
+ assertEquals("d", mapVal.get("c"));
+ assertEquals(Integer.valueOf(2), listVal.get(1));
+
+ assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
+ assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+ assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+ assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+ assertEquals("abc123", rowValues[20]);
+ assertEquals("abc123 ", rowValues[21]);
+ assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+ }
+
+
+ @Test(timeout = 60000)
+ public void testComplexQuery() throws Exception {
+ createTestTable("testtab1");
+
+ RowCollector rowCollector = new RowCollector();
+ String query = "select value, count(*) from testtab1 where under_col=0 group by value";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(1, rowCount);
+
+ assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0));
+ }
+
+ private interface RowProcessor {
+ void process(Row row);
+ }
+
+ protected static class RowCollector implements RowProcessor {
+ ArrayList<String[]> rows = new ArrayList<String[]>();
+ Schema schema = null;
+ int numColumns = 0;
+
+ public void process(Row row) {
+ if (schema == null) {
+ schema = row.getSchema();
+ numColumns = schema.getColumns().size();
+ }
+
+ String[] arr = new String[numColumns];
+ for (int idx = 0; idx < numColumns; ++idx) {
+ Object val = row.getValue(idx);
+ arr[idx] = (val == null ? null : val.toString());
+ }
+ rows.add(arr);
+ }
+ }
+
+ // Save the actual values from each row as opposed to the String representation.
+ protected static class RowCollector2 implements RowProcessor {
+ ArrayList<Object[]> rows = new ArrayList<Object[]>();
+ Schema schema = null;
+ int numColumns = 0;
+
+ public void process(Row row) {
+ if (schema == null) {
+ schema = row.getSchema();
+ numColumns = schema.getColumns().size();
+ }
+
+ Object[] arr = new Object[numColumns];
+ for (int idx = 0; idx < numColumns; ++idx) {
+ arr[idx] = row.getValue(idx);
+ }
+ rows.add(arr);
+ }
+ }
+
+ protected int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception {
+ return processQuery(null, query, numSplits, rowProcessor);
+ }
+
+ protected abstract InputFormat<NullWritable, Row> getInputFormat();
+
+ private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception {
+ String url = miniHS2.getJdbcURL();
+ String user = System.getProperty("user.name");
+ String pwd = user;
+ String handleId = UUID.randomUUID().toString();
+
+ InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+ // Get splits
+ JobConf job = new JobConf(conf);
+ job.set(LlapBaseInputFormat.URL_KEY, url);
+ job.set(LlapBaseInputFormat.USER_KEY, user);
+ job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+ job.set(LlapBaseInputFormat.QUERY_KEY, query);
+ job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+ if (currentDatabase != null) {
+ job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
+ }
+
+ InputSplit[] splits = inputFormat.getSplits(job, numSplits);
+ assertTrue(splits.length > 0);
+
+ // Fetch rows from splits
+ boolean first = true;
+ int rowCount = 0;
+ for (InputSplit split : splits) {
+ System.out.println("Processing split " + split.getLocations());
+
+ int numColumns = 2;
+ RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
+ Row row = reader.createValue();
+ while (reader.next(NullWritable.get(), row)) {
+ rowProcessor.process(row);
+ ++rowCount;
+ }
+ reader.close();
+ }
+ LlapBaseInputFormat.close(handleId);
+
+ return rowCount;
+ }
+
+ /**
+ * Test CLI kill command of a query that is running.
+ * We spawn 2 threads - one running the query and
+ * the other attempting to cancel.
+ * We're using a dummy udf to simulate a query,
+ * that runs for a sufficiently long time.
+ * @throws Exception
+ */
+ @Test
+ public void testKillQuery() throws Exception {
+ String tableName = "testtab1";
+ createTestTable(tableName);
+ Connection con = hs2Conn;
+ Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+
+ String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
+ Statement stmt1 = con.createStatement();
+ Statement stmt2 = con2.createStatement();
+ stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
+ stmt1.close();
+ final Statement stmt = con.createStatement();
+
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ // Thread executing the query
+ Thread tExecute = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ System.out.println("Executing query: ");
+ // The test table has 500 rows, so total query time should be ~ 500*500ms
+ stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " +
+ "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col");
+ fail("Expecting SQLException");
+ } catch (SQLException e) {
+ tExecuteHolder.throwable = e;
+ }
+ }
+ });
+ // Thread killing the query
+ Thread tKill = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ String queryId = ((HiveStatement) stmt).getQueryId();
+ System.out.println("Killing query: " + queryId);
+
+ stmt2.execute("kill query '" + queryId + "'");
+ stmt2.close();
+ } catch (Exception e) {
+ tKillHolder.throwable = e;
+ }
+ }
+ });
+
+ tExecute.start();
+ tKill.start();
+ tExecute.join();
+ tKill.join();
+ stmt.close();
+ con2.close();
+
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
+ private static class ExceptionHolder {
+ Throwable throwable;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
deleted file mode 100644
index 7e35fef..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.jdbc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.LlapRowRecordReader;
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowInputFormat;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import org.datanucleus.ClassLoaderResolver;
-import org.datanucleus.NucleusContext;
-import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
-import org.datanucleus.AbstractNucleusContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestJdbcWithMiniLlap {
- private static MiniHS2 miniHS2 = null;
- private static String dataFileDir;
- private static Path kvDataFilePath;
- private static Path dataTypesFilePath;
-
- private static HiveConf conf = null;
- private Connection hs2Conn = null;
-
- @BeforeClass
- public static void beforeTest() throws Exception {
- Class.forName(MiniHS2.getJdbcDriverName());
-
- String confDir = "../../data/conf/llap/";
- if (confDir != null && !confDir.isEmpty()) {
- HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
- System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
- }
-
- conf = new HiveConf();
- conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
-
- miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
- dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
- kvDataFilePath = new Path(dataFileDir, "kv1.txt");
- dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
- Map<String, String> confOverlay = new HashMap<String, String>();
- miniHS2.start(confOverlay);
- miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
- }
-
- @Before
- public void setUp() throws Exception {
- hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
- }
-
- public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException {
- Connection conn = DriverManager.getConnection(jdbcURL, user, pwd);
- conn.createStatement().execute("set hive.support.concurrency = false");
- return conn;
- }
-
- @After
- public void tearDown() throws Exception {
- LlapBaseInputFormat.closeAll();
- hs2Conn.close();
- }
-
- @AfterClass
- public static void afterTest() throws Exception {
- if (miniHS2.isStarted()) {
- miniHS2.stop();
- }
- }
-
- private void createTestTable(String tableName) throws Exception {
- createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
- }
-
- public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws
- Exception {
- Statement stmt = connection.createStatement();
-
- if (database != null) {
- stmt.execute("CREATE DATABASE IF NOT EXISTS " + database);
- stmt.execute("USE " + database);
- }
-
- // create table
- stmt.execute("DROP TABLE IF EXISTS " + tableName);
- stmt.execute("CREATE TABLE " + tableName
- + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
-
- // load data
- stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName);
-
- ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
- assertTrue(res.next());
- assertEquals("val_238", res.getString(2));
- res.close();
- stmt.close();
- }
-
- private void createDataTypesTable(String tableName) throws Exception {
- Statement stmt = hs2Conn.createStatement();
-
- // create table
- stmt.execute("DROP TABLE IF EXISTS " + tableName);
- // tables with various types
- stmt.execute("create table " + tableName
- + " (c1 int, c2 boolean, c3 double, c4 string,"
- + " c5 array<int>, c6 map<int,string>, c7 map<string,string>,"
- + " c8 struct<r:string,s:int,t:double>,"
- + " c9 tinyint, c10 smallint, c11 float, c12 bigint,"
- + " c13 array<array<string>>,"
- + " c14 map<int, map<int,int>>,"
- + " c15 struct<r:int,s:struct<a:int,b:string>>,"
- + " c16 array<struct<m:map<string,string>,n:int>>,"
- + " c17 timestamp, "
- + " c18 decimal(16,7), "
- + " c19 binary, "
- + " c20 date,"
- + " c21 varchar(20),"
- + " c22 char(15),"
- + " c23 binary"
- + ")");
- stmt.execute("load data local inpath '"
- + dataTypesFilePath.toString() + "' into table " + tableName);
- stmt.close();
- }
-
- @Test(timeout = 60000)
- public void testLlapInputFormatEndToEnd() throws Exception {
- createTestTable("testtab1");
-
- int rowCount;
-
- RowCollector rowCollector = new RowCollector();
- String query = "select * from testtab1 where under_col = 0";
- rowCount = processQuery(query, 1, rowCollector);
- assertEquals(3, rowCount);
- assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0));
- assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1));
- assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2));
-
- // Try empty rows query
- rowCollector.rows.clear();
- query = "select * from testtab1 where true = false";
- rowCount = processQuery(query, 1, rowCollector);
- assertEquals(0, rowCount);
- }
-
- @Test(timeout = 60000)
- public void testNonAsciiStrings() throws Exception {
- createTestTable(hs2Conn, "nonascii", "testtab_nonascii", kvDataFilePath.toString());
-
- RowCollector rowCollector = new RowCollector();
- String nonAscii = "À côté du garçon";
- String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0";
- int rowCount = processQuery("nonascii", query, 1, rowCollector);
- assertEquals(3, rowCount);
-
- assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0));
- assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1));
- assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2));
- }
-
- @Test(timeout = 60000)
- public void testEscapedStrings() throws Exception {
- createTestTable("testtab1");
-
- RowCollector rowCollector = new RowCollector();
- String expectedVal1 = "'a',\"b\",\\c\\";
- String expectedVal2 = "multi\nline";
- String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0";
- int rowCount = processQuery(query, 1, rowCollector);
- assertEquals(3, rowCount);
-
- assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0));
- assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1));
- assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2));
- }
-
- @Test(timeout = 60000)
- public void testDataTypes() throws Exception {
- createDataTypesTable("datatypes");
- RowCollector2 rowCollector = new RowCollector2();
- String query = "select * from datatypes";
- int rowCount = processQuery(query, 1, rowCollector);
- assertEquals(3, rowCount);
-
- // Verify schema
- String[][] colNameTypes = new String[][] {
- {"datatypes.c1", "int"},
- {"datatypes.c2", "boolean"},
- {"datatypes.c3", "double"},
- {"datatypes.c4", "string"},
- {"datatypes.c5", "array<int>"},
- {"datatypes.c6", "map<int,string>"},
- {"datatypes.c7", "map<string,string>"},
- {"datatypes.c8", "struct<r:string,s:int,t:double>"},
- {"datatypes.c9", "tinyint"},
- {"datatypes.c10", "smallint"},
- {"datatypes.c11", "float"},
- {"datatypes.c12", "bigint"},
- {"datatypes.c13", "array<array<string>>"},
- {"datatypes.c14", "map<int,map<int,int>>"},
- {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
- {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
- {"datatypes.c17", "timestamp"},
- {"datatypes.c18", "decimal(16,7)"},
- {"datatypes.c19", "binary"},
- {"datatypes.c20", "date"},
- {"datatypes.c21", "varchar(20)"},
- {"datatypes.c22", "char(15)"},
- {"datatypes.c23", "binary"},
- };
- FieldDesc fieldDesc;
- assertEquals(23, rowCollector.numColumns);
- for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
- fieldDesc = rowCollector.schema.getColumns().get(idx);
- assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
- assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
- }
-
- // First row is all nulls
- Object[] rowValues = rowCollector.rows.get(0);
- for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
- assertEquals("idx=" + idx, null, rowValues[idx]);
- }
-
- // Second Row
- rowValues = rowCollector.rows.get(1);
- assertEquals(Integer.valueOf(-1), rowValues[0]);
- assertEquals(Boolean.FALSE, rowValues[1]);
- assertEquals(Double.valueOf(-1.1d), rowValues[2]);
- assertEquals("", rowValues[3]);
-
- List<?> c5Value = (List<?>) rowValues[4];
- assertEquals(0, c5Value.size());
-
- Map<?,?> c6Value = (Map<?,?>) rowValues[5];
- assertEquals(0, c6Value.size());
-
- Map<?,?> c7Value = (Map<?,?>) rowValues[6];
- assertEquals(0, c7Value.size());
-
- List<?> c8Value = (List<?>) rowValues[7];
- assertEquals(null, c8Value.get(0));
- assertEquals(null, c8Value.get(1));
- assertEquals(null, c8Value.get(2));
-
- assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
- assertEquals(Short.valueOf((short) -1), rowValues[9]);
- assertEquals(Float.valueOf(-1.0f), rowValues[10]);
- assertEquals(Long.valueOf(-1l), rowValues[11]);
-
- List<?> c13Value = (List<?>) rowValues[12];
- assertEquals(0, c13Value.size());
-
- Map<?,?> c14Value = (Map<?,?>) rowValues[13];
- assertEquals(0, c14Value.size());
-
- List<?> c15Value = (List<?>) rowValues[14];
- assertEquals(null, c15Value.get(0));
- assertEquals(null, c15Value.get(1));
-
- List<?> c16Value = (List<?>) rowValues[15];
- assertEquals(0, c16Value.size());
-
- assertEquals(null, rowValues[16]);
- assertEquals(null, rowValues[17]);
- assertEquals(null, rowValues[18]);
- assertEquals(null, rowValues[19]);
- assertEquals(null, rowValues[20]);
- assertEquals(null, rowValues[21]);
- assertEquals(null, rowValues[22]);
-
- // Third row
- rowValues = rowCollector.rows.get(2);
- assertEquals(Integer.valueOf(1), rowValues[0]);
- assertEquals(Boolean.TRUE, rowValues[1]);
- assertEquals(Double.valueOf(1.1d), rowValues[2]);
- assertEquals("1", rowValues[3]);
-
- c5Value = (List<?>) rowValues[4];
- assertEquals(2, c5Value.size());
- assertEquals(Integer.valueOf(1), c5Value.get(0));
- assertEquals(Integer.valueOf(2), c5Value.get(1));
-
- c6Value = (Map<?,?>) rowValues[5];
- assertEquals(2, c6Value.size());
- assertEquals("x", c6Value.get(Integer.valueOf(1)));
- assertEquals("y", c6Value.get(Integer.valueOf(2)));
-
- c7Value = (Map<?,?>) rowValues[6];
- assertEquals(1, c7Value.size());
- assertEquals("v", c7Value.get("k"));
-
- c8Value = (List<?>) rowValues[7];
- assertEquals("a", c8Value.get(0));
- assertEquals(Integer.valueOf(9), c8Value.get(1));
- assertEquals(Double.valueOf(2.2d), c8Value.get(2));
-
- assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
- assertEquals(Short.valueOf((short) 1), rowValues[9]);
- assertEquals(Float.valueOf(1.0f), rowValues[10]);
- assertEquals(Long.valueOf(1l), rowValues[11]);
-
- c13Value = (List<?>) rowValues[12];
- assertEquals(2, c13Value.size());
- List<?> listVal = (List<?>) c13Value.get(0);
- assertEquals("a", listVal.get(0));
- assertEquals("b", listVal.get(1));
- listVal = (List<?>) c13Value.get(1);
- assertEquals("c", listVal.get(0));
- assertEquals("d", listVal.get(1));
-
- c14Value = (Map<?,?>) rowValues[13];
- assertEquals(2, c14Value.size());
- Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
- assertEquals(2, mapVal.size());
- assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
- assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
- mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
- assertEquals(1, mapVal.size());
- assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
-
- c15Value = (List<?>) rowValues[14];
- assertEquals(Integer.valueOf(1), c15Value.get(0));
- listVal = (List<?>) c15Value.get(1);
- assertEquals(2, listVal.size());
- assertEquals(Integer.valueOf(2), listVal.get(0));
- assertEquals("x", listVal.get(1));
-
- c16Value = (List<?>) rowValues[15];
- assertEquals(2, c16Value.size());
- listVal = (List<?>) c16Value.get(0);
- assertEquals(2, listVal.size());
- mapVal = (Map<?,?>) listVal.get(0);
- assertEquals(0, mapVal.size());
- assertEquals(Integer.valueOf(1), listVal.get(1));
- listVal = (List<?>) c16Value.get(1);
- mapVal = (Map<?,?>) listVal.get(0);
- assertEquals(2, mapVal.size());
- assertEquals("b", mapVal.get("a"));
- assertEquals("d", mapVal.get("c"));
- assertEquals(Integer.valueOf(2), listVal.get(1));
-
- assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
- assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
- assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
- assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
- assertEquals("abc123", rowValues[20]);
- assertEquals("abc123 ", rowValues[21]);
- assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
- }
-
-
- @Test(timeout = 60000)
- public void testComplexQuery() throws Exception {
- createTestTable("testtab1");
-
- RowCollector rowCollector = new RowCollector();
- String query = "select value, count(*) from testtab1 where under_col=0 group by value";
- int rowCount = processQuery(query, 1, rowCollector);
- assertEquals(1, rowCount);
-
- assertArrayEquals(new String[] {"val_0", "3"}, rowCollector.rows.get(0));
- }
-
- private interface RowProcessor {
- void process(Row row);
- }
-
- private static class RowCollector implements RowProcessor {
- ArrayList<String[]> rows = new ArrayList<String[]>();
- Schema schema = null;
- int numColumns = 0;
-
- public void process(Row row) {
- if (schema == null) {
- schema = row.getSchema();
- numColumns = schema.getColumns().size();
- }
-
- String[] arr = new String[numColumns];
- for (int idx = 0; idx < numColumns; ++idx) {
- Object val = row.getValue(idx);
- arr[idx] = (val == null ? null : val.toString());
- }
- rows.add(arr);
- }
- }
-
- // Save the actual values from each row as opposed to the String representation.
- private static class RowCollector2 implements RowProcessor {
- ArrayList<Object[]> rows = new ArrayList<Object[]>();
- Schema schema = null;
- int numColumns = 0;
-
- public void process(Row row) {
- if (schema == null) {
- schema = row.getSchema();
- numColumns = schema.getColumns().size();
- }
-
- Object[] arr = new Object[numColumns];
- for (int idx = 0; idx < numColumns; ++idx) {
- arr[idx] = row.getValue(idx);
- }
- rows.add(arr);
- }
- }
-
- private int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception {
- return processQuery(null, query, numSplits, rowProcessor);
- }
-
- private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception {
- String url = miniHS2.getJdbcURL();
- String user = System.getProperty("user.name");
- String pwd = user;
- String handleId = UUID.randomUUID().toString();
-
- LlapRowInputFormat inputFormat = new LlapRowInputFormat();
-
- // Get splits
- JobConf job = new JobConf(conf);
- job.set(LlapBaseInputFormat.URL_KEY, url);
- job.set(LlapBaseInputFormat.USER_KEY, user);
- job.set(LlapBaseInputFormat.PWD_KEY, pwd);
- job.set(LlapBaseInputFormat.QUERY_KEY, query);
- job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
- if (currentDatabase != null) {
- job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
- }
-
- InputSplit[] splits = inputFormat.getSplits(job, numSplits);
- assertTrue(splits.length > 0);
-
- // Fetch rows from splits
- boolean first = true;
- int rowCount = 0;
- for (InputSplit split : splits) {
- System.out.println("Processing split " + split.getLocations());
-
- int numColumns = 2;
- RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
- Row row = reader.createValue();
- while (reader.next(NullWritable.get(), row)) {
- rowProcessor.process(row);
- ++rowCount;
- }
- reader.close();
- }
- LlapBaseInputFormat.close(handleId);
-
- return rowCount;
- }
-
- /**
- * Test CLI kill command of a query that is running.
- * We spawn 2 threads - one running the query and
- * the other attempting to cancel.
- * We're using a dummy udf to simulate a query,
- * that runs for a sufficiently long time.
- * @throws Exception
- */
- @Test
- public void testKillQuery() throws Exception {
- String tableName = "testtab1";
- createTestTable(tableName);
- Connection con = hs2Conn;
- Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
-
- String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
- Statement stmt1 = con.createStatement();
- Statement stmt2 = con2.createStatement();
- stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
- stmt1.close();
- final Statement stmt = con.createStatement();
-
- ExceptionHolder tExecuteHolder = new ExceptionHolder();
- ExceptionHolder tKillHolder = new ExceptionHolder();
-
- // Thread executing the query
- Thread tExecute = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println("Executing query: ");
- // The test table has 500 rows, so total query time should be ~ 500*500ms
- stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " +
- "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col");
- fail("Expecting SQLException");
- } catch (SQLException e) {
- tExecuteHolder.throwable = e;
- }
- }
- });
- // Thread killing the query
- Thread tKill = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(2000);
- String queryId = ((HiveStatement) stmt).getQueryId();
- System.out.println("Killing query: " + queryId);
-
- stmt2.execute("kill query '" + queryId + "'");
- stmt2.close();
- } catch (Exception e) {
- tKillHolder.throwable = e;
- }
- }
- });
-
- tExecute.start();
- tKill.start();
- tExecute.join();
- tKill.join();
- stmt.close();
- con2.close();
-
- assertNotNull("tExecute", tExecuteHolder.throwable);
- assertNull("tCancel", tKillHolder.throwable);
- }
-
- private static class ExceptionHolder {
- Throwable throwable;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
new file mode 100644
index 0000000..afb9837
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for Arrow format
+ */
+public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ BaseJdbcWithMiniLlap.beforeTest(true);
+ }
+
+ @Override
+ protected InputFormat<NullWritable, Row> getInputFormat() {
+ //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE
+ return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+ }
+
+ // Currently MAP type is not supported. Add it back when Arrow 1.0 is released.
+ // See: SPARK-21187
+ @Override
+ public void testDataTypes() throws Exception {
+ createDataTypesTable("datatypes");
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from datatypes";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ // Verify schema
+ String[][] colNameTypes = new String[][] {
+ {"datatypes.c1", "int"},
+ {"datatypes.c2", "boolean"},
+ {"datatypes.c3", "double"},
+ {"datatypes.c4", "string"},
+ {"datatypes.c5", "array<int>"},
+ {"datatypes.c6", "map<int,string>"},
+ {"datatypes.c7", "map<string,string>"},
+ {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+ {"datatypes.c9", "tinyint"},
+ {"datatypes.c10", "smallint"},
+ {"datatypes.c11", "float"},
+ {"datatypes.c12", "bigint"},
+ {"datatypes.c13", "array<array<string>>"},
+ {"datatypes.c14", "map<int,map<int,int>>"},
+ {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+ {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+ {"datatypes.c17", "timestamp"},
+ {"datatypes.c18", "decimal(16,7)"},
+ {"datatypes.c19", "binary"},
+ {"datatypes.c20", "date"},
+ {"datatypes.c21", "varchar(20)"},
+ {"datatypes.c22", "char(15)"},
+ {"datatypes.c23", "binary"},
+ };
+ FieldDesc fieldDesc;
+ assertEquals(23, rowCollector.numColumns);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ fieldDesc = rowCollector.schema.getColumns().get(idx);
+ assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+ assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+ }
+
+ // First row is all nulls
+ Object[] rowValues = rowCollector.rows.get(0);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ assertEquals("idx=" + idx, null, rowValues[idx]);
+ }
+
+ // Second Row
+ rowValues = rowCollector.rows.get(1);
+ assertEquals(Integer.valueOf(-1), rowValues[0]);
+ assertEquals(Boolean.FALSE, rowValues[1]);
+ assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+ assertEquals("", rowValues[3]);
+
+ List<?> c5Value = (List<?>) rowValues[4];
+ assertEquals(0, c5Value.size());
+
+ //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ //assertEquals(0, c6Value.size());
+
+ //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ //assertEquals(0, c7Value.size());
+
+ List<?> c8Value = (List<?>) rowValues[7];
+ assertEquals(null, c8Value.get(0));
+ assertEquals(null, c8Value.get(1));
+ assertEquals(null, c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+ assertEquals(Short.valueOf((short) -1), rowValues[9]);
+ assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+ List<?> c13Value = (List<?>) rowValues[12];
+ assertEquals(0, c13Value.size());
+
+ //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ //assertEquals(0, c14Value.size());
+
+ List<?> c15Value = (List<?>) rowValues[14];
+ assertEquals(null, c15Value.get(0));
+ assertEquals(null, c15Value.get(1));
+
+ //List<?> c16Value = (List<?>) rowValues[15];
+ //assertEquals(0, c16Value.size());
+
+ assertEquals(null, rowValues[16]);
+ assertEquals(null, rowValues[17]);
+ assertEquals(null, rowValues[18]);
+ assertEquals(null, rowValues[19]);
+ assertEquals(null, rowValues[20]);
+ assertEquals(null, rowValues[21]);
+ assertEquals(null, rowValues[22]);
+
+ // Third row
+ rowValues = rowCollector.rows.get(2);
+ assertEquals(Integer.valueOf(1), rowValues[0]);
+ assertEquals(Boolean.TRUE, rowValues[1]);
+ assertEquals(Double.valueOf(1.1d), rowValues[2]);
+ assertEquals("1", rowValues[3]);
+
+ c5Value = (List<?>) rowValues[4];
+ assertEquals(2, c5Value.size());
+ assertEquals(Integer.valueOf(1), c5Value.get(0));
+ assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+ //c6Value = (Map<?,?>) rowValues[5];
+ //assertEquals(2, c6Value.size());
+ //assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+ //c7Value = (Map<?,?>) rowValues[6];
+ //assertEquals(1, c7Value.size());
+ //assertEquals("v", c7Value.get("k"));
+
+ c8Value = (List<?>) rowValues[7];
+ assertEquals("a", c8Value.get(0));
+ assertEquals(Integer.valueOf(9), c8Value.get(1));
+ assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+ assertEquals(Short.valueOf((short) 1), rowValues[9]);
+ assertEquals(Float.valueOf(1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(1l), rowValues[11]);
+
+ c13Value = (List<?>) rowValues[12];
+ assertEquals(2, c13Value.size());
+ List<?> listVal = (List<?>) c13Value.get(0);
+ assertEquals("a", listVal.get(0));
+ assertEquals("b", listVal.get(1));
+ listVal = (List<?>) c13Value.get(1);
+ assertEquals("c", listVal.get(0));
+ assertEquals("d", listVal.get(1));
+
+ //c14Value = (Map<?,?>) rowValues[13];
+ //assertEquals(2, c14Value.size());
+ //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ //assertEquals(2, mapVal.size());
+ //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ //assertEquals(1, mapVal.size());
+ //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+ c15Value = (List<?>) rowValues[14];
+ assertEquals(Integer.valueOf(1), c15Value.get(0));
+ listVal = (List<?>) c15Value.get(1);
+ assertEquals(2, listVal.size());
+ assertEquals(Integer.valueOf(2), listVal.get(0));
+ assertEquals("x", listVal.get(1));
+
+ //c16Value = (List<?>) rowValues[15];
+ //assertEquals(2, c16Value.size());
+ //listVal = (List<?>) c16Value.get(0);
+ //assertEquals(2, listVal.size());
+ //mapVal = (Map<?,?>) listVal.get(0);
+ //assertEquals(0, mapVal.size());
+ //assertEquals(Integer.valueOf(1), listVal.get(1));
+ //listVal = (List<?>) c16Value.get(1);
+ //mapVal = (Map<?,?>) listVal.get(0);
+ //assertEquals(2, mapVal.size());
+ //assertEquals("b", mapVal.get("a"));
+ //assertEquals("d", mapVal.get("c"));
+ //assertEquals(Integer.valueOf(2), listVal.get(1));
+
+ assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
+ assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+ assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+ assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+ assertEquals("abc123", rowValues[20]);
+ assertEquals("abc123 ", rowValues[21]);
+ assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
new file mode 100644
index 0000000..809068f
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.hive.llap.LlapRowInputFormat;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.After;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for llap Row format.
+ */
+public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap {
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ BaseJdbcWithMiniLlap.beforeTest(false);
+ }
+
+ @Override
+ protected InputFormat<NullWritable, Row> getInputFormat() {
+ return new LlapRowInputFormat();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index a9ed3d2..5316aa7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -22,25 +22,15 @@ import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.Closeable;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.DataInputStream;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.hive.llap.io.ChunkedInputStream;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,52 +139,61 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
throw new IOException("Hit end of input, but did not find expected end of data indicator");
}
- // There should be a reader event available, or coming soon, so okay to be blocking call.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case DONE:
- break;
- default:
- throw new IOException("Expected reader event with done status, but got "
- + event.getEventType() + " with message " + event.getMessage());
- }
+ processReaderEvent();
return false;
}
} catch (IOException io) {
- try {
- if (Thread.interrupted()) {
- // Either we were interrupted by one of:
- // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
- // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
- // Either way we should not try to block trying to read the reader events queue.
- if (readerEvents.isEmpty()) {
- // Case 2.
- throw io;
- } else {
- // Case 1. Fail the reader, sending back the error we received from the reader event.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case ERROR:
- throw new IOException("Received reader event error: " + event.getMessage(), io);
- default:
- throw new IOException("Got reader event type " + event.getEventType()
- + ", expected error event", io);
- }
- }
- } else {
- // If we weren't interrupted, just propagate the error
+ failOnInterruption(io);
+ return false;
+ }
+ }
+
+ protected void processReaderEvent() throws IOException {
+ // There should be a reader event available, or coming soon, so okay to be blocking call.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case DONE:
+ break;
+ default:
+ throw new IOException("Expected reader event with done status, but got "
+ + event.getEventType() + " with message " + event.getMessage());
+ }
+ }
+
+ protected void failOnInterruption(IOException io) throws IOException {
+ try {
+ if (Thread.interrupted()) {
+ // Either we were interrupted by one of:
+ // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
+ // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+ // Either way we should not try to block trying to read the reader events queue.
+ if (readerEvents.isEmpty()) {
+ // Case 2.
throw io;
+ } else {
+ // Case 1. Fail the reader, sending back the error we received from the reader event.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case ERROR:
+ throw new IOException("Received reader event error: " + event.getMessage(), io);
+ default:
+ throw new IOException("Got reader event type " + event.getEventType()
+ + ", expected error event", io);
+ }
}
- } finally {
- // The external client handling umbilical responses and the connection to read the incoming
- // data are not coupled. Calling close() here to make sure an error in one will cause the
- // other to be closed as well.
- try {
- close();
- } catch (Exception err) {
- // Don't propagate errors from close() since this will lose the original error above.
- LOG.error("Closing RecordReader due to error and hit another error during close()", err);
- }
+ } else {
+ // If we weren't interrupted, just propagate the error
+ throw io;
+ }
+ } finally {
+ // The external client handling umbilical responses and the connection to read the incoming
+ // data are not coupled. Calling close() here to make sure an error in one will cause the
+ // other to be closed as well.
+ try {
+ close();
+ } catch (Exception err) {
+ // Don't propagate errors from close() since this will lose the original error above.
+ LOG.error("Closing RecordReader due to error and hit another error during close()", err);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
index 1cfbf3a..6cc1d17 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -70,20 +69,20 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
protected final Configuration conf;
- protected final RecordReader<NullWritable, BytesWritable> reader;
+ protected final RecordReader reader;
protected final Schema schema;
protected final AbstractSerDe serde;
- protected final BytesWritable data;
+ protected final Writable data;
public LlapRowRecordReader(Configuration conf, Schema schema,
- RecordReader<NullWritable, BytesWritable> reader) throws IOException {
+ RecordReader<NullWritable, ? extends Writable> reader) throws IOException {
this.conf = conf;
this.schema = schema;
this.reader = reader;
- this.data = new BytesWritable();
+ this.data = reader.createValue();
try {
- serde = initSerDe(conf);
+ this.serde = initSerDe(conf);
} catch (SerDeException err) {
throw new IOException(err);
}
@@ -118,7 +117,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
public boolean next(NullWritable key, Row value) throws IOException {
Preconditions.checkArgument(value != null);
- boolean hasNext = reader.next(key, data);
+ boolean hasNext = reader.next(key, data);
if (hasNext) {
// Deserialize data to column values, and populate the row record
Object rowObj;
@@ -216,7 +215,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
return convertedVal;
}
- static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) {
+ protected static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) {
Schema structSchema = row.getSchema();
// Add struct field data to the Row
List<? extends StructField> structFields = soi.getAllStructFieldRefs();
@@ -230,6 +229,11 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
}
}
+ //Factory method for serDe
+ protected AbstractSerDe createSerDe() throws SerDeException {
+ return new LazyBinarySerDe();
+ }
+
protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException {
Properties props = new Properties();
StringBuilder columnsBuffer = new StringBuilder();
@@ -249,9 +253,9 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
props.put(serdeConstants.LIST_COLUMNS, columns);
props.put(serdeConstants.LIST_COLUMN_TYPES, types);
props.put(serdeConstants.ESCAPE_CHAR, "\\");
- AbstractSerDe serde = new LazyBinarySerDe();
- serde.initialize(conf, props);
+ AbstractSerDe createdSerDe = createSerDe();
+ createdSerDe.initialize(conf, props);
- return serde;
+ return createdSerDe;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml
index ed4704b..295d3e6 100644
--- a/llap-ext-client/pom.xml
+++ b/llap-ext-client/pom.xml
@@ -41,6 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
new file mode 100644
index 0000000..d9c5666
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+/*
+ * Read from Arrow stream batch-by-batch
+ */
+public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrapperWritable> {
+
+ private BufferAllocator allocator;
+ private ArrowStreamReader arrowStreamReader;
+
+ public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
+ JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+ super(in, schema, clazz, job, client, socket);
+ allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+ this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator);
+ }
+
+ @Override
+ public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException {
+ try {
+ // Need a way to know what thread to interrupt, since this is a blocking thread.
+ setReaderThread(Thread.currentThread());
+
+ boolean hasInput = arrowStreamReader.loadNextBatch();
+ if (hasInput) {
+ VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
+ //There must be at least one column vector
+ Preconditions.checkState(vectorSchemaRoot.getFieldVectors().size() > 0);
+ if(vectorSchemaRoot.getFieldVectors().get(0).getValueCount() == 0) {
+ //An empty batch will appear at the end of the stream
+ return false;
+ }
+ value.setVectorSchemaRoot(arrowStreamReader.getVectorSchemaRoot());
+ return true;
+ } else {
+ processReaderEvent();
+ return false;
+ }
+ } catch (IOException io) {
+ failOnInterruption(io);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ arrowStreamReader.close();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
new file mode 100644
index 0000000..fafbdee
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+
+/*
+ * Adapts an Arrow batch reader to a row reader
+ */
+public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> {
+
+ private LlapBaseInputFormat baseInputFormat;
+
+ public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
+ baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return baseInputFormat.getSplits(job, numSplits);
+ }
+
+ @Override
+ public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
+ LlapArrowBatchRecordReader reader =
+ (LlapArrowBatchRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter);
+ return new LlapArrowRowRecordReader(job, reader.getSchema(), reader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
new file mode 100644
index 0000000..d4179d5
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Buffers a batch for reading one row at a time.
+ */
+public class LlapArrowRowRecordReader extends LlapRowRecordReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapArrowRowRecordReader.class);
+ private int rowIndex = 0;
+ private int batchSize = 0;
+
+ //Buffer one batch at a time, for row retrieval
+ private Object[][] currentBatch;
+
+ public LlapArrowRowRecordReader(Configuration conf, Schema schema,
+ RecordReader<NullWritable, ? extends Writable> reader) throws IOException {
+ super(conf, schema, reader);
+ }
+
+ @Override
+ public boolean next(NullWritable key, Row value) throws IOException {
+ Preconditions.checkArgument(value != null);
+ boolean hasNext = false;
+ ArrowWrapperWritable batchData = (ArrowWrapperWritable) data;
+ if((batchSize == 0) || (rowIndex == batchSize)) {
+ //This is either the first batch or we've used up the current batch buffer
+ batchSize = 0;
+ rowIndex = 0;
+ hasNext = reader.next(key, data);
+ if(hasNext) {
+ //There is another batch to buffer
+ try {
+ List<FieldVector> vectors = batchData.getVectorSchemaRoot().getFieldVectors();
+ //hasNext implies there is some column in the batch
+ Preconditions.checkState(vectors.size() > 0);
+ //All the vectors have the same length,
+ //we can get the number of rows from the first vector
+ batchSize = vectors.get(0).getValueCount();
+ ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot());
+ currentBatch = (Object[][]) serde.deserialize(wrapper);
+ StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
+ setRowFromStruct(value, currentBatch[rowIndex], rowOI);
+ } catch (Exception e) {
+ LOG.error("Failed to fetch Arrow batch", e);
+ throw new RuntimeException(e);
+ }
+ }
+ //There were no more batches AND
+ //this is either the first batch or we've used up the current batch buffer.
+ //goto return false
+ } else if(rowIndex < batchSize) {
+ //Take a row from the current buffered batch
+ hasNext = true;
+ StructObjectInspector rowOI = null;
+ try {
+ rowOI = (StructObjectInspector) serde.getObjectInspector();
+ } catch (SerDeException e) {
+ throw new RuntimeException(e);
+ }
+ setRowFromStruct(value, currentBatch[rowIndex], rowOI);
+ }
+ //Always inc the batch buffer index
+ //If we return false, it is just a noop
+ rowIndex++;
+ return hasNext;
+ }
+
+ protected AbstractSerDe createSerDe() throws SerDeException {
+ return new ArrowColumnarBatchSerDe();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index f4c7fa4..ef03be6 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -49,15 +49,15 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -104,6 +104,8 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private String user; // "hive",
private String pwd; // ""
private String query;
+ private boolean useArrow;
+ private long arrowAllocatorLimit;
private final Random rand = new Random();
public static final String URL_KEY = "llap.if.hs2.connection";
@@ -123,7 +125,14 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
this.query = query;
}
- public LlapBaseInputFormat() {}
+ public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
+ this.useArrow = useArrow;
+ this.arrowAllocatorLimit = arrowAllocatorLimit;
+ }
+
+ public LlapBaseInputFormat() {
+ this.useArrow = false;
+ }
@SuppressWarnings("unchecked")
@@ -195,8 +204,16 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
LOG.info("Registered id: " + fragmentId);
@SuppressWarnings("rawtypes")
- LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(),
- llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket);
+ LlapBaseRecordReader recordReader;
+ if(useArrow) {
+ recordReader = new LlapArrowBatchRecordReader(
+ socket.getInputStream(), llapSplit.getSchema(),
+ ArrowWrapperWritable.class, job, llapClient, socket,
+ arrowAllocatorLimit);
+ } else {
+ recordReader = new LlapBaseRecordReader(socket.getInputStream(),
+ llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);
+ }
umbilicalResponder.setRecordReader(recordReader);
return recordReader;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2c78ceb6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index df10c74..9f64cbf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
<antlr.version>3.5.2</antlr.version>
<apache-directory-server.version>1.5.6</apache-directory-server.version>
<apache-directory-clientapi.version>0.1</apache-directory-clientapi.version>
+ <!-- Include arrow for LlapOutputFormatService -->
<arrow.version>0.8.0</arrow.version>
<avatica.version>1.11.0</avatica.version>
<avro.version>1.7.7</avro.version>