You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2018/08/06 23:52:33 UTC
hive git commit: HIVE-20300: VectorFileSinkArrowOperator (Eric
Wohlstadter, reviewed by Jason Dere, Matt McCline, Teddy Choi)
Repository: hive
Updated Branches:
refs/heads/master 42bf02e33 -> a8ef2147f
HIVE-20300: VectorFileSinkArrowOperator (Eric Wohlstadter, reviewed by Jason Dere, Matt McCline, Teddy Choi)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a8ef2147
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a8ef2147
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a8ef2147
Branch: refs/heads/master
Commit: a8ef2147fad5aeaaf01279230da9c584db6a2337
Parents: 42bf02e
Author: Matt McCline <mm...@hortonworks.com>
Authored: Mon Aug 6 18:52:24 2018 -0500
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Mon Aug 6 18:52:24 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 38 +-
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 6 +-
.../hive/jdbc/TestJdbcWithMiniLlapRow.java | 6 +-
.../jdbc/TestJdbcWithMiniLlapVectorArrow.java | 235 ++++++
.../filesink/VectorFileSinkArrowOperator.java | 180 +++++
.../hadoop/hive/ql/io/arrow/Serializer.java | 754 +++++++++++++------
.../hive/ql/optimizer/physical/Vectorizer.java | 60 +-
8 files changed, 1032 insertions(+), 253 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 535a56b..e251920 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3662,7 +3662,11 @@ public class HiveConf extends Configuration {
"internal use only. When false, don't suppress fatal exceptions like\n" +
"NullPointerException, etc so the query will fail and assure it will be noticed",
true),
-
+ HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED(
+ "hive.vectorized.execution.filesink.arrow.native.enabled", false,
+ "This flag should be set to true to enable the native vectorization\n" +
+ "of queries using the Arrow SerDe and FileSink.\n" +
+ "The default value is false."),
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control "
+ "whether to check, convert, and normalize partition value to conform to its column type in "
+ "partition operations including but not limited to insert, such as alter, describe etc."),
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 280119b..98f4729 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -110,29 +110,10 @@ public abstract class BaseJdbcWithMiniLlap {
private static Connection hs2Conn = null;
// This method should be called by sub-classes in a @BeforeClass initializer
- public static void beforeTest(boolean useArrow) throws Exception {
+ public static void beforeTest(HiveConf inputConf) throws Exception {
+ conf = inputConf;
Class.forName(MiniHS2.getJdbcDriverName());
-
- String confDir = "../../data/conf/llap/";
- if (confDir != null && !confDir.isEmpty()) {
- HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
- System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
- }
-
- conf = new HiveConf();
- conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- if(useArrow) {
- conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
- } else {
- conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
- }
-
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
-
miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
@@ -141,6 +122,19 @@ public abstract class BaseJdbcWithMiniLlap {
miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
}
+ static HiveConf defaultConf() throws Exception {
+ String confDir = "../../data/conf/llap/";
+ if (confDir != null && !confDir.isEmpty()) {
+ HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation());
+ }
+ HiveConf defaultConf = new HiveConf();
+ defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml"));
+ return defaultConf;
+ }
+
@Before
public void setUp() throws Exception {
hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
@@ -546,6 +540,8 @@ public abstract class BaseJdbcWithMiniLlap {
rowProcessor.process(row);
++rowCount;
}
+ //In arrow-mode this will throw exception unless all buffers have been released
+ //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
reader.close();
}
LlapBaseInputFormat.close(handleId);
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index e69c686..c02980b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.io.NullWritable;
import org.junit.BeforeClass;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
@@ -40,7 +42,9 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
@BeforeClass
public static void beforeTest() throws Exception {
- BaseJdbcWithMiniLlap.beforeTest(true);
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+ BaseJdbcWithMiniLlap.beforeTest(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
index 809068f..d954d0e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -25,6 +25,8 @@ import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.After;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
/**
* TestJdbcWithMiniLlap for llap Row format.
@@ -33,7 +35,9 @@ public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap {
@BeforeClass
public static void beforeTest() throws Exception {
- BaseJdbcWithMiniLlap.beforeTest(false);
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+ BaseJdbcWithMiniLlap.beforeTest(conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
new file mode 100644
index 0000000..55a2df8
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import java.math.BigDecimal;
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import java.util.List;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for Arrow format with vectorized output sink
+ */
+public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap {
+
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ HiveConf conf = defaultConf();
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+ conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true);
+ BaseJdbcWithMiniLlap.beforeTest(conf);
+ }
+
+ @Override
+ protected InputFormat<NullWritable, Row> getInputFormat() {
+ //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE
+ return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+ }
+
+ // Currently MAP type is not supported. Add it back when Arrow 1.0 is released.
+ // See: SPARK-21187
+ @Override
+ public void testDataTypes() throws Exception {
+ createDataTypesTable("datatypes");
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from datatypes";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ // Verify schema
+ String[][] colNameTypes = new String[][] {
+ {"datatypes.c1", "int"},
+ {"datatypes.c2", "boolean"},
+ {"datatypes.c3", "double"},
+ {"datatypes.c4", "string"},
+ {"datatypes.c5", "array<int>"},
+ {"datatypes.c6", "map<int,string>"},
+ {"datatypes.c7", "map<string,string>"},
+ {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+ {"datatypes.c9", "tinyint"},
+ {"datatypes.c10", "smallint"},
+ {"datatypes.c11", "float"},
+ {"datatypes.c12", "bigint"},
+ {"datatypes.c13", "array<array<string>>"},
+ {"datatypes.c14", "map<int,map<int,int>>"},
+ {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+ {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+ {"datatypes.c17", "timestamp"},
+ {"datatypes.c18", "decimal(16,7)"},
+ {"datatypes.c19", "binary"},
+ {"datatypes.c20", "date"},
+ {"datatypes.c21", "varchar(20)"},
+ {"datatypes.c22", "char(15)"},
+ {"datatypes.c23", "binary"},
+ };
+ FieldDesc fieldDesc;
+ assertEquals(23, rowCollector.numColumns);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ fieldDesc = rowCollector.schema.getColumns().get(idx);
+ assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+ assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+ }
+
+ // First row is all nulls
+ Object[] rowValues = rowCollector.rows.get(0);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ assertEquals("idx=" + idx, null, rowValues[idx]);
+ }
+
+ // Second Row
+ rowValues = rowCollector.rows.get(1);
+ assertEquals(Integer.valueOf(-1), rowValues[0]);
+ assertEquals(Boolean.FALSE, rowValues[1]);
+ assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+ assertEquals("", rowValues[3]);
+
+ List<?> c5Value = (List<?>) rowValues[4];
+ assertEquals(0, c5Value.size());
+
+ //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ //assertEquals(0, c6Value.size());
+
+ //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ //assertEquals(0, c7Value.size());
+
+ List<?> c8Value = (List<?>) rowValues[7];
+ assertEquals(null, c8Value.get(0));
+ assertEquals(null, c8Value.get(1));
+ assertEquals(null, c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+ assertEquals(Short.valueOf((short) -1), rowValues[9]);
+ assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+ List<?> c13Value = (List<?>) rowValues[12];
+ assertEquals(0, c13Value.size());
+
+ //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ //assertEquals(0, c14Value.size());
+
+ List<?> c15Value = (List<?>) rowValues[14];
+ assertEquals(null, c15Value.get(0));
+ assertEquals(null, c15Value.get(1));
+
+ //List<?> c16Value = (List<?>) rowValues[15];
+ //assertEquals(0, c16Value.size());
+
+ assertEquals(null, rowValues[16]);
+ assertEquals(null, rowValues[17]);
+ assertEquals(null, rowValues[18]);
+ assertEquals(null, rowValues[19]);
+ assertEquals(null, rowValues[20]);
+ assertEquals(null, rowValues[21]);
+ assertEquals(null, rowValues[22]);
+
+ // Third row
+ rowValues = rowCollector.rows.get(2);
+ assertEquals(Integer.valueOf(1), rowValues[0]);
+ assertEquals(Boolean.TRUE, rowValues[1]);
+ assertEquals(Double.valueOf(1.1d), rowValues[2]);
+ assertEquals("1", rowValues[3]);
+
+ c5Value = (List<?>) rowValues[4];
+ assertEquals(2, c5Value.size());
+ assertEquals(Integer.valueOf(1), c5Value.get(0));
+ assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+ //c6Value = (Map<?,?>) rowValues[5];
+ //assertEquals(2, c6Value.size());
+ //assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+ //c7Value = (Map<?,?>) rowValues[6];
+ //assertEquals(1, c7Value.size());
+ //assertEquals("v", c7Value.get("k"));
+
+ c8Value = (List<?>) rowValues[7];
+ assertEquals("a", c8Value.get(0));
+ assertEquals(Integer.valueOf(9), c8Value.get(1));
+ assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+ assertEquals(Short.valueOf((short) 1), rowValues[9]);
+ assertEquals(Float.valueOf(1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(1l), rowValues[11]);
+
+ c13Value = (List<?>) rowValues[12];
+ assertEquals(2, c13Value.size());
+ List<?> listVal = (List<?>) c13Value.get(0);
+ assertEquals("a", listVal.get(0));
+ assertEquals("b", listVal.get(1));
+ listVal = (List<?>) c13Value.get(1);
+ assertEquals("c", listVal.get(0));
+ assertEquals("d", listVal.get(1));
+
+ //c14Value = (Map<?,?>) rowValues[13];
+ //assertEquals(2, c14Value.size());
+ //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ //assertEquals(2, mapVal.size());
+ //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ //assertEquals(1, mapVal.size());
+ //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+ c15Value = (List<?>) rowValues[14];
+ assertEquals(Integer.valueOf(1), c15Value.get(0));
+ listVal = (List<?>) c15Value.get(1);
+ assertEquals(2, listVal.size());
+ assertEquals(Integer.valueOf(2), listVal.get(0));
+ assertEquals("x", listVal.get(1));
+
+ //c16Value = (List<?>) rowValues[15];
+ //assertEquals(2, c16Value.size());
+ //listVal = (List<?>) c16Value.get(0);
+ //assertEquals(2, listVal.size());
+ //mapVal = (Map<?,?>) listVal.get(0);
+ //assertEquals(0, mapVal.size());
+ //assertEquals(Integer.valueOf(1), listVal.get(1));
+ //listVal = (List<?>) c16Value.get(1);
+ //mapVal = (Map<?,?>) listVal.get(0);
+ //assertEquals(2, mapVal.size());
+ //assertEquals("b", mapVal.get("a"));
+ //assertEquals("d", mapVal.get("c"));
+ //assertEquals(Integer.valueOf(2), listVal.get(1));
+
+ assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456"), rowValues[16]);
+ assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+ assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+ assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+ assertEquals("abc123", rowValues[20]);
+ assertEquals("abc123 ", rowValues[21]);
+ assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
new file mode 100644
index 0000000..1603703
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.filesink;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.hive.llap.LlapOutputFormatService;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.hive.ql.io.arrow.Serializer;
+import static org.apache.hadoop.hive.llap.LlapOutputFormat.LLAP_OF_ID_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.core.layout.AbstractStringLayout;
+
+/**
+ * Native Vectorized File Sink operator implementation for Arrow.
+ * Assumes output to LlapOutputFormatService
+ **/
+public class VectorFileSinkArrowOperator extends TerminalOperator<FileSinkDesc>
+ implements Serializable, VectorizationOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private VectorizationContext vContext;
+ private VectorFileSinkDesc vectorDesc;
+ public static final Logger LOG = LoggerFactory.getLogger(VectorFileSinkArrowOperator.class.getName());
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
+ private transient Serializer converter;
+ private transient RecordWriter recordWriter;
+ private transient boolean wroteData;
+ private transient String attemptId;
+
+ public VectorFileSinkArrowOperator(CompilationOpContext ctx, OperatorDesc conf,
+ VectorizationContext vContext, VectorDesc vectorDesc) {
+ this(ctx);
+ this.conf = (FileSinkDesc) conf;
+ this.vContext = vContext;
+ this.vectorDesc = (VectorFileSinkDesc) vectorDesc;
+ }
+
+ /** Kryo ctor. */
+ @VisibleForTesting
+ public VectorFileSinkArrowOperator() {
+ super();
+ }
+
+ public VectorFileSinkArrowOperator(CompilationOpContext ctx) {
+ super(ctx);
+ }
+
+ @Override
+ public VectorizationContext getInputVectorizationContext() {
+ return vContext;
+ }
+
+ @Override
+ protected void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+ //attemptId identifies a RecordWriter initialized by LlapOutputFormatService
+ this.attemptId = hconf.get(LLAP_OF_ID_KEY);
+ try {
+ //Initialize column names and types
+ List<TypeInfo> typeInfos = new ArrayList<>();
+ List<String> fieldNames = new ArrayList<>();
+ StructObjectInspector schema = (StructObjectInspector) inputObjInspectors[0];
+ for(int i = 0; i < schema.getAllStructFieldRefs().size(); i++) {
+ StructField structField = schema.getAllStructFieldRefs().get(i);
+ fieldNames.add(structField.getFieldName());
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector());
+ typeInfos.add(typeInfo);
+ }
+ //Initialize an Arrow serializer
+ converter = new Serializer(hconf, attemptId, typeInfos, fieldNames);
+ } catch (Exception e) {
+ LOG.error("Unable to initialize VectorFileSinkArrowOperator");
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void process(Object data, int tag) throws HiveException {
+ //ArrowStreamReader expects at least the schema metadata, if this op writes no data,
+ //we need to send the schema to close the stream gracefully
+ VectorizedRowBatch batch = (VectorizedRowBatch) data;
+ try {
+ if(recordWriter == null) {
+ recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId);
+ }
+ //Convert the VectorizedRowBatch to a handle for the Arrow batch
+ ArrowWrapperWritable writable = converter.serializeBatch(batch, true);
+ //Pass the handle to the LlapOutputFormatService recordWriter
+ recordWriter.write(null, writable);
+ this.wroteData = true;
+ } catch(Exception e) {
+ LOG.error("Failed to convert VectorizedRowBatch to Arrow batch");
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ try {
+ if(!wroteData) {
+ //Send a schema only batch to signal EOS with no data written
+ ArrowWrapperWritable writable = converter.emptyBatch();
+ if(recordWriter == null) {
+ recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId);
+ }
+ recordWriter.write(null, writable);
+ }
+ } catch(Exception e) {
+ LOG.error("Failed to write Arrow stream schema");
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ //Close the recordWriter with null Reporter
+ recordWriter.close(null);
+ } catch(Exception e) {
+ LOG.error("Failed to close Arrow stream");
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public VectorDesc getVectorDesc() {
+ return vectorDesc;
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.FILESINK;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 65a889e..08e0fb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -42,10 +42,12 @@ import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import java.util.ArrayList;
import java.util.List;
@@ -87,21 +90,39 @@ import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStruc
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
-class Serializer {
+public class Serializer {
private final int MAX_BUFFERED_ROWS;
- // Schema
- private final StructTypeInfo structTypeInfo;
- private final int fieldSize;
-
// Hive columns
private final VectorizedRowBatch vectorizedRowBatch;
private final VectorAssignRow vectorAssignRow;
private int batchSize;
private BufferAllocator allocator;
+ private List<TypeInfo> fieldTypeInfos;
+ private List<String> fieldNames;
+ private int fieldSize;
private final NullableMapVector rootVector;
+ //Constructor for non-serde serialization
+ public Serializer(Configuration conf, String attemptId, List<TypeInfo> typeInfos, List<String> fieldNames) {
+ this.fieldTypeInfos = typeInfos;
+ this.fieldNames = fieldNames;
+ long childAllocatorLimit = HiveConf.getLongVar(conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
+ //Use per-task allocator for accounting only, no need to reserve per-task memory
+ long childAllocatorReservation = 0L;
+ //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed
+ allocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf).newChildAllocator(
+ attemptId,
+ childAllocatorReservation,
+ childAllocatorLimit);
+ rootVector = NullableMapVector.empty(null, allocator);
+ //These last fields are unused in non-serde usage
+ vectorizedRowBatch = null;
+ vectorAssignRow = null;
+ MAX_BUFFERED_ROWS = 0;
+ }
+
Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
@@ -111,13 +132,14 @@ class Serializer {
long childAllocatorReservation = 0L;
//Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed
allocator = serDe.rootAllocator.newChildAllocator(
- childAllocatorName,
- childAllocatorReservation,
- childAllocatorLimit);
+ childAllocatorName,
+ childAllocatorReservation,
+ childAllocatorLimit);
// Schema
- structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
- List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ StructTypeInfo structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
+ fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ fieldNames = structTypeInfo.getAllStructFieldNames();
fieldSize = fieldTypeInfos.size();
// Init Arrow stuffs
rootVector = NullableMapVector.empty(null, allocator);
@@ -138,33 +160,66 @@ class Serializer {
}
}
- private ArrowWrapperWritable serializeBatch() {
+ //Construct an emptyBatch which contains schema-only info
+ public ArrowWrapperWritable emptyBatch() {
+ rootVector.setValueCount(0);
+ for (int fieldIndex = 0; fieldIndex < fieldTypeInfos.size(); fieldIndex++) {
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final String fieldName = fieldNames.get(fieldIndex);
+ final FieldType fieldType = toFieldType(fieldTypeInfo);
+ final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class);
+ arrowVector.setInitialCapacity(0);
+ arrowVector.allocateNew();
+ }
+ VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
+ return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
+ }
+
+ //Used for both:
+ //1. VectorizedRowBatch constructed by batching rows
+ //2. VectorizedRowBatch provided from upstream (isNative)
+ public ArrowWrapperWritable serializeBatch(VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
rootVector.setValueCount(0);
for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; fieldIndex++) {
final int projectedColumn = vectorizedRowBatch.projectedColumns[fieldIndex];
final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
- final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
- final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final String fieldName = fieldNames.get(fieldIndex);
final FieldType fieldType = toFieldType(fieldTypeInfo);
+ //Reuse existing FieldVector buffers
+ //since we always call setValue or setNull for each row
+ boolean fieldExists = false;
+ if(rootVector.getChild(fieldName) != null) {
+ fieldExists = true;
+ }
final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class);
- arrowVector.setInitialCapacity(batchSize);
- arrowVector.allocateNew();
- write(arrowVector, hiveVector, fieldTypeInfo, batchSize);
+ if(fieldExists) {
+ arrowVector.setValueCount(isNative ? vectorizedRowBatch.size : batchSize);
+ } else {
+ arrowVector.setInitialCapacity(isNative ? vectorizedRowBatch.size : batchSize);
+ arrowVector.allocateNew();
+ }
+ write(arrowVector, hiveVector, fieldTypeInfo, isNative ? vectorizedRowBatch.size : batchSize, vectorizedRowBatch, isNative);
+ }
+ if(!isNative) {
+ //Only mutate batches that are constructed by this serde
+ vectorizedRowBatch.reset();
+ rootVector.setValueCount(batchSize);
+ } else {
+ rootVector.setValueCount(vectorizedRowBatch.size);
}
- vectorizedRowBatch.reset();
- rootVector.setValueCount(batchSize);
batchSize = 0;
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
}
- private FieldType toFieldType(TypeInfo typeInfo) {
+ private static FieldType toFieldType(TypeInfo typeInfo) {
return new FieldType(true, toArrowType(typeInfo), null);
}
- private ArrowType toArrowType(TypeInfo typeInfo) {
+ private static ArrowType toArrowType(TypeInfo typeInfo) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
@@ -218,34 +273,35 @@ class Serializer {
}
}
- private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size) {
+ private static void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size,
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
switch (typeInfo.getCategory()) {
case PRIMITIVE:
- writePrimitive(arrowVector, hiveVector, typeInfo, size);
+ writePrimitive(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative);
break;
case LIST:
- writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size);
+ writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
break;
case STRUCT:
- writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size);
+ writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
break;
case UNION:
- writeUnion(arrowVector, hiveVector, typeInfo, size);
+ writeUnion(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative);
break;
case MAP:
- writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size);
+ writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
break;
default:
throw new IllegalArgumentException();
- }
+ }
}
- private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo,
- int size) {
+ private static void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo,
+ int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
final ListColumnVector structListVector = toStructListVector(hiveVector);
- write(arrowVector, structListVector, structListTypeInfo, size);
+ write(arrowVector, structListVector, structListTypeInfo, size, vectorizedRowBatch, isNative);
final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
for (int rowIndex = 0; rowIndex < size; rowIndex++) {
@@ -257,8 +313,8 @@ class Serializer {
}
}
- private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
- int size) {
+ private static void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
+ int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
@@ -268,11 +324,11 @@ class Serializer {
final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
- write(arrowVector, hiveObjectVector, objectTypeInfo, size);
+ write(arrowVector, hiveObjectVector, objectTypeInfo, size, vectorizedRowBatch, isNative);
}
- private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector,
- StructTypeInfo typeInfo, int size) {
+ private static void writeStruct(MapVector arrowVector, StructColumnVector hiveVector,
+ StructTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final List<String> fieldNames = typeInfo.getAllStructFieldNames();
final List<TypeInfo> fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
final ColumnVector[] hiveFieldVectors = hiveVector.fields;
@@ -287,7 +343,7 @@ class Serializer {
toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
arrowFieldVector.setInitialCapacity(size);
arrowFieldVector.allocateNew();
- write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size);
+ write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size, vectorizedRowBatch, isNative);
}
final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
@@ -300,8 +356,8 @@ class Serializer {
}
}
- private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo,
- int size) {
+ private static void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size,
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final int OFFSET_WIDTH = 4;
final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
final ColumnVector hiveElementVector = hiveVector.child;
@@ -310,7 +366,7 @@ class Serializer {
arrowElementVector.setInitialCapacity(hiveVector.childCount);
arrowElementVector.allocateNew();
- write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount);
+ write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount, vectorizedRowBatch, isNative);
final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
int nextOffset = 0;
@@ -327,208 +383,244 @@ class Serializer {
offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset);
}
- private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
- int size) {
+ //Handle cases for both internally constructed
+ //and externally provided (isNative) VectorRowBatch
+ private static void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size,
+ VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
switch (primitiveCategory) {
- case BOOLEAN:
- {
- final BitVector bitVector = (BitVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- bitVector.setNull(i);
- } else {
- bitVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ case BOOLEAN:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, boolNullSetter, boolValueSetter);
+ return;
+ }
+ final BitVector bitVector = (BitVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ boolNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ boolValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case BYTE:
- {
- final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- tinyIntVector.setNull(i);
- } else {
- tinyIntVector.set(i, (byte) ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case BYTE:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, byteNullSetter, byteValueSetter);
+ return;
+ }
+ final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ byteNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ byteValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case SHORT:
- {
- final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- smallIntVector.setNull(i);
- } else {
- smallIntVector.set(i, (short) ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case SHORT:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, shortNullSetter, shortValueSetter);
+ return;
+ }
+ final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ shortNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ shortValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case INT:
- {
- final IntVector intVector = (IntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- intVector.setNull(i);
- } else {
- intVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case INT:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intNullSetter, intValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ intValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case LONG:
- {
- final BigIntVector bigIntVector = (BigIntVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- bigIntVector.setNull(i);
- } else {
- bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case LONG:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, longNullSetter, longValueSetter);
+ return;
+ }
+ final BigIntVector bigIntVector = (BigIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ longNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ longValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case FLOAT:
- {
- final Float4Vector float4Vector = (Float4Vector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- float4Vector.setNull(i);
- } else {
- float4Vector.set(i, (float) ((DoubleColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case FLOAT:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, floatNullSetter, floatValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ floatNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ floatValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case DOUBLE:
- {
- final Float8Vector float8Vector = (Float8Vector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- float8Vector.setNull(i);
- } else {
- float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case DOUBLE:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, doubleNullSetter, doubleValueSetter);
+ return;
+ }
+ final Float8Vector float8Vector = (Float8Vector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ doubleNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ doubleValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case STRING:
- case VARCHAR:
- case CHAR:
- {
- final VarCharVector varCharVector = (VarCharVector) arrowVector;
- final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- varCharVector.setNull(i);
- } else {
- varCharVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]);
- }
- }
+ }
+ }
+ break;
+ //TODO Add CHAR padding conversion
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, stringNullSetter, stringValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ stringNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ stringValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case DATE:
- {
- final DateDayVector dateDayVector = (DateDayVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- dateDayVector.setNull(i);
- } else {
- dateDayVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ }
+ }
+ break;
+ case DATE:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter, dateValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ dateNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ dateValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case TIMESTAMP:
- {
- final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector;
- final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- timeStampMicroTZVector.setNull(i);
- } else {
- // Time = second + sub-second
- final long secondInMillis = timestampColumnVector.getTime(i);
- final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
- final long subSecondInMicros = timestampColumnVector.getNanos(i) / NS_PER_MICROS;
-
- if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) {
- // If the timestamp cannot be represented in long microsecond, set it as a null value
- timeStampMicroTZVector.setNull(i);
- } else {
- timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros);
- }
- }
- }
+ }
+ }
+ break;
+ case TIMESTAMP:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, timestampNullSetter, timestampValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ timestampNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ timestampValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case BINARY:
- {
- final VarBinaryVector varBinaryVector = (VarBinaryVector) arrowVector;
- final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- varBinaryVector.setNull(i);
- } else {
- varBinaryVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]);
- }
- }
+ }
+ }
+ break;
+ case BINARY:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, binaryNullSetter, binaryValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ binaryNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ binaryValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case DECIMAL:
- {
- final DecimalVector decimalVector = (DecimalVector) arrowVector;
- final int scale = decimalVector.getScale();
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- decimalVector.setNull(i);
- } else {
- decimalVector.set(i,
- ((DecimalColumnVector) hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale));
- }
- }
+ }
+ }
+ break;
+ case DECIMAL:
+ {
+ if(isNative) {
+ if(hiveVector instanceof DecimalColumnVector) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimalValueSetter);
+ } else {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimal64ValueSetter);
}
- break;
- case INTERVAL_YEAR_MONTH:
- {
- final IntervalYearVector intervalYearVector = (IntervalYearVector) arrowVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- intervalYearVector.setNull(i);
- } else {
- intervalYearVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
- }
- }
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ decimalNullSetter.accept(i, arrowVector, hiveVector);
+ } else if(hiveVector instanceof DecimalColumnVector) {
+ decimalValueSetter.accept(i, i, arrowVector, hiveVector);
+ } else if(hiveVector instanceof Decimal64ColumnVector) {
+ decimal64ValueSetter.accept(i, i, arrowVector, hiveVector);
+ } else {
+ throw new IllegalArgumentException("Unsupported vector column type: " + hiveVector.getClass().getName());
}
- break;
- case INTERVAL_DAY_TIME:
- {
- final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector;
- final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
- (IntervalDayTimeColumnVector) hiveVector;
- for (int i = 0; i < size; i++) {
- if (hiveVector.isNull[i]) {
- intervalDayVector.setNull(i);
- } else {
- final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i);
- final long days = totalSeconds / SECOND_PER_DAY;
- final long millis =
- (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
- intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS;
- intervalDayVector.set(i, (int) days, (int) millis);
- }
- }
+ }
+ }
+ break;
+ case INTERVAL_YEAR_MONTH:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalYearMonthNullSetter, intervalYearMonthValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intervalYearMonthNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ intervalYearMonthValueSetter.accept(i, i, arrowVector, hiveVector);
}
- break;
- case VOID:
- case UNKNOWN:
- case TIMESTAMPLOCALTZ:
- default:
- throw new IllegalArgumentException();
+ }
+ }
+ break;
+ case INTERVAL_DAY_TIME:
+ {
+ if(isNative) {
+ writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalDayTimeNullSetter, intervalDayTimeValueSetter);
+ return;
+ }
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intervalDayTimeNullSetter.accept(i, arrowVector, hiveVector);
+ } else {
+ intervalDayTimeValueSetter.accept(i, i, arrowVector, hiveVector);
+ }
+ }
+ }
+ break;
+ case VOID:
+ case UNKNOWN:
+ case TIMESTAMPLOCALTZ:
+ default:
+ throw new IllegalArgumentException();
}
}
@@ -536,7 +628,7 @@ class Serializer {
// if row is null, it means there are no more rows (closeOp()).
// another case can be that the buffer is full.
if (obj == null) {
- return serializeBatch();
+ return serializeBatch(vectorizedRowBatch, false);
}
List<Object> standardObjects = new ArrayList<Object>();
ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
@@ -545,8 +637,218 @@ class Serializer {
vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize);
batchSize++;
if (batchSize == MAX_BUFFERED_ROWS) {
- return serializeBatch();
+ return serializeBatch(vectorizedRowBatch, false);
}
return null;
}
+
+ //Use a provided nullSetter and valueSetter function to populate
+ //fieldVector from hiveVector
+ private static void writeGeneric(final FieldVector fieldVector, final ColumnVector hiveVector, final int size, final boolean selectedInUse, final int[] selected, final IntAndVectorsConsumer nullSetter, final IntIntAndVectorsConsumer valueSetter)
+ {
+ final boolean[] inputIsNull = hiveVector.isNull;
+ final int[] sel = selected;
+
+ if (hiveVector.isRepeating) {
+ if (hiveVector.noNulls || !inputIsNull[0]) {
+ for(int i = 0; i < size; i++) {
+ //Fill n rows with value in row 0
+ valueSetter.accept(i, 0, fieldVector, hiveVector);
+ }
+ } else {
+ for(int i = 0; i < size; i++) {
+ //Fill n rows with NULL
+ nullSetter.accept(i, fieldVector, hiveVector);
+ }
+ }
+ return;
+ }
+
+ if (hiveVector.noNulls) {
+ if (selectedInUse) {
+ for(int logical = 0; logical < size; logical++) {
+ final int batchIndex = sel[logical];
+ //Add row batchIndex
+ valueSetter.accept(logical, batchIndex, fieldVector, hiveVector);
+ }
+ } else {
+ for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+ //Add row batchIndex
+ valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector);
+ }
+ }
+ } else {
+ if (selectedInUse) {
+ for(int logical = 0; logical < size; logical++) {
+ final int batchIndex = sel[logical];
+ if (inputIsNull[batchIndex]) {
+ //Add NULL
+ nullSetter.accept(batchIndex, fieldVector, hiveVector);
+ } else {
+ //Add row batchIndex
+ valueSetter.accept(logical, batchIndex, fieldVector, hiveVector);
+ }
+ }
+ } else {
+ for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+ if (inputIsNull[batchIndex]) {
+ //Add NULL
+ nullSetter.accept(batchIndex, fieldVector, hiveVector);
+ } else {
+ //Add row batchIndex
+ valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector);
+ }
+ }
+ }
+ }
+ }
+
+ //nullSetters and valueSetter for each type
+
+ //bool
+ private static final IntAndVectorsConsumer boolNullSetter = (i, arrowVector, hiveVector)
+ -> ((BitVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer boolValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((BitVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+ //byte
+ private static final IntAndVectorsConsumer byteNullSetter = (i, arrowVector, hiveVector)
+ -> ((TinyIntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer byteValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((TinyIntVector) arrowVector).set(i, (byte) ((LongColumnVector) hiveVector).vector[j]);
+
+ //short
+ private static final IntAndVectorsConsumer shortNullSetter = (i, arrowVector, hiveVector)
+ -> ((SmallIntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer shortValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((SmallIntVector) arrowVector).set(i, (short) ((LongColumnVector) hiveVector).vector[j]);
+
+ //int
+ private static final IntAndVectorsConsumer intNullSetter = (i, arrowVector, hiveVector)
+ -> ((IntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer intValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((IntVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+ //long
+ private static final IntAndVectorsConsumer longNullSetter = (i, arrowVector, hiveVector)
+ -> ((BigIntVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer longValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((BigIntVector) arrowVector).set(i, ((LongColumnVector) hiveVector).vector[j]);
+
+ //float
+ private static final IntAndVectorsConsumer floatNullSetter = (i, arrowVector, hiveVector)
+ -> ((Float4Vector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer floatValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((Float4Vector) arrowVector).set(i, (float) ((DoubleColumnVector) hiveVector).vector[j]);
+
+ //double
+ private static final IntAndVectorsConsumer doubleNullSetter = (i, arrowVector, hiveVector)
+ -> ((Float8Vector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer doubleValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((Float8Vector) arrowVector).set(i, ((DoubleColumnVector) hiveVector).vector[j]);
+
+ //string/varchar
+ private static final IntAndVectorsConsumer stringNullSetter = (i, arrowVector, hiveVector)
+ -> ((VarCharVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer stringValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]);
+ };
+
+ //fixed-length CHAR
+ //TODO Add padding conversion
+ private static final IntAndVectorsConsumer charNullSetter = (i, arrowVector, hiveVector)
+ -> ((VarCharVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer charValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]);
+ };
+
+ //date
+ private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector, hiveVector)
+ -> ((DateDayVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer dateValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+ //timestamp
+ private static final IntAndVectorsConsumer timestampNullSetter = (i, arrowVector, hiveVector)
+ -> ((TimeStampMicroTZVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector;
+ final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector;
+ // Time = second + sub-second
+ final long secondInMillis = timestampColumnVector.getTime(j);
+ final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
+ final long subSecondInMicros = timestampColumnVector.getNanos(j) / NS_PER_MICROS;
+ if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) {
+ // If the timestamp cannot be represented in long microsecond, set it as a null value
+ timeStampMicroTZVector.setNull(i);
+ } else {
+ timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros);
+ }
+ };
+
+ //binary
+ private static final IntAndVectorsConsumer binaryNullSetter = (i, arrowVector, hiveVector)
+ -> ((VarBinaryVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer binaryValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ ((VarBinaryVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]);
+ };
+
+ //decimal and decimal64
+ private static final IntAndVectorsConsumer decimalNullSetter = (i, arrowVector, hiveVector)
+ -> ((DecimalVector) arrowVector).setNull(i);
+ private static final IntIntAndVectorsConsumer decimalValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ final DecimalVector decimalVector = (DecimalVector) arrowVector;
+ final int scale = decimalVector.getScale();
+ decimalVector.set(i, ((DecimalColumnVector) hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale));
+ };
+ private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ final DecimalVector decimalVector = (DecimalVector) arrowVector;
+ final int scale = decimalVector.getScale();
+ HiveDecimalWritable decimalHolder = new HiveDecimalWritable();
+ decimalHolder.setFromLongAndScale(((Decimal64ColumnVector) hiveVector).vector[j], scale);
+ decimalVector.set(i, decimalHolder.getHiveDecimal().bigDecimalValue().setScale(scale));
+ };
+
+ //interval year
+ private static final IntAndVectorsConsumer intervalYearMonthNullSetter = (i, arrowVector, hiveVector)
+ -> ((IntervalYearVector) arrowVector).setNull(i);
+ private static IntIntAndVectorsConsumer intervalYearMonthValueSetter = (i, j, arrowVector, hiveVector)
+ -> ((IntervalYearVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]);
+
+ //interval day
+ private static final IntAndVectorsConsumer intervalDayTimeNullSetter = (i, arrowVector, hiveVector)
+ -> ((IntervalDayVector) arrowVector).setNull(i);
+ private static IntIntAndVectorsConsumer intervalDayTimeValueSetter = (i, j, arrowVector, hiveVector)
+ -> {
+ final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector;
+ final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
+ (IntervalDayTimeColumnVector) hiveVector;
+ long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(j);
+ final long days = totalSeconds / SECOND_PER_DAY;
+ final long millis =
+ (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
+ intervalDayTimeColumnVector.getNanos(j) / NS_PER_MILLIS;
+ intervalDayVector.set(i, (int) days, (int) millis);
+ };
+
+ //Used for setting null at arrowVector[i]
+ private interface IntAndVectorsConsumer {
+ void accept(int i, FieldVector arrowVector, ColumnVector hiveVector);
+ }
+
+ //Used to copy value from hiveVector[j] -> arrowVector[i]
+ //since hiveVector might be referenced through vector.selected
+ private interface IntIntAndVectorsConsumer {
+ void accept(int i, int j, FieldVector arrowVector, ColumnVector hiveVector);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a8ef2147/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index eb5b1a8..9bb104d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator;
@@ -4119,6 +4120,48 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
+ private boolean checkForArrowFileSink(FileSinkDesc fileSinkDesc,
+ boolean isTezOrSpark, VectorizationContext vContext,
+ VectorFileSinkDesc vectorDesc) throws HiveException {
+
+ // Various restrictions.
+
+ boolean isVectorizationFileSinkArrowNativeEnabled =
+ HiveConf.getBoolVar(hiveConf,
+ HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED);
+
+ String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+
+ String serdeClassName = fileSinkDesc.getTableInfo().getSerdeClassName();
+
+ boolean isOkArrowFileSink =
+ serdeClassName.equals("org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe") &&
+ isVectorizationFileSinkArrowNativeEnabled &&
+ engine.equalsIgnoreCase("tez");
+
+ return isOkArrowFileSink;
+ }
+
+ private Operator<? extends OperatorDesc> specializeArrowFileSinkOperator(
+ Operator<? extends OperatorDesc> op, VectorizationContext vContext, FileSinkDesc desc,
+ VectorFileSinkDesc vectorDesc) throws HiveException {
+
+ Class<? extends Operator<?>> opClass = VectorFileSinkArrowOperator.class;
+
+ Operator<? extends OperatorDesc> vectorOp = null;
+ try {
+ vectorOp = OperatorFactory.getVectorOperator(
+ opClass, op.getCompilationOpContext(), op.getConf(),
+ vContext, vectorDesc);
+ } catch (Exception e) {
+ LOG.info("Vectorizer vectorizeOperator file sink class exception " + opClass.getSimpleName() +
+ " exception " + e);
+ throw new HiveException(e);
+ }
+
+ return vectorOp;
+ }
+
private boolean usesVectorUDFAdaptor(VectorExpression vecExpr) {
if (vecExpr == null) {
return false;
@@ -5145,9 +5188,20 @@ public class Vectorizer implements PhysicalPlanResolver {
FileSinkDesc fileSinkDesc = (FileSinkDesc) op.getConf();
VectorFileSinkDesc vectorFileSinkDesc = new VectorFileSinkDesc();
- vectorOp = OperatorFactory.getVectorOperator(
- op.getCompilationOpContext(), fileSinkDesc, vContext, vectorFileSinkDesc);
- isNative = false;
+ boolean isArrowSpecialization =
+ checkForArrowFileSink(fileSinkDesc, isTezOrSpark, vContext, vectorFileSinkDesc);
+
+ if (isArrowSpecialization) {
+ vectorOp =
+ specializeArrowFileSinkOperator(
+ op, vContext, fileSinkDesc, vectorFileSinkDesc);
+ isNative = true;
+ } else {
+ vectorOp =
+ OperatorFactory.getVectorOperator(
+ op.getCompilationOpContext(), fileSinkDesc, vContext, vectorFileSinkDesc);
+ isNative = false;
+ }
}
break;
case LIMIT: