You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/29 20:58:44 UTC
[1/6] hive git commit: HIVE-19306: Arrow batch serializer (Teddy Choi,
reviewed by Matt McCline and Eric Wohlstadter (non-binding))
Repository: hive
Updated Branches:
refs/heads/branch-3 7156df66f -> 2334a0ddf
HIVE-19306: Arrow batch serializer (Teddy Choi, reviewed by Matt McCline and Eric Wohlstadter (non-binding))
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e090e58
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e090e58
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e090e58
Branch: refs/heads/branch-3
Commit: 0e090e58772516070e472713422aa8566df81b50
Parents: 7156df6
Author: Matt McCline <mm...@hortonworks.com>
Authored: Thu May 10 16:42:50 2018 -0500
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 29 13:56:07 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../ql/io/arrow/ArrowColumnarBatchSerDe.java | 1179 ++++++++++++++++++
.../hive/ql/io/arrow/ArrowWrapperWritable.java | 47 +
.../hive/ql/io/arrow/RootAllocatorFactory.java | 44 +
.../io/arrow/TestArrowColumnarBatchSerDe.java | 815 ++++++++++++
serde/pom.xml | 5 +
6 files changed, 2095 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 60d5f04..128e892 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2625,6 +2625,11 @@ public class HiveConf extends Configuration {
"Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +
"table there is at most 1 matching row in the source table per SQL Specification."),
+ // For Arrow SerDe
+ HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", Long.MAX_VALUE,
+ "Arrow root allocator memory size limitation in bytes."),
+ HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows sent in one Arrow batch."),
+
// For Druid storage handler
HIVE_DRUID_INDEXING_GRANULARITY("hive.druid.indexer.segments.granularity", "DAY",
new PatternSet("YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND"),
http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
new file mode 100644
index 0000000..330fa58
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
@@ -0,0 +1,1179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.impl.UnionListWriter;
+import org.apache.arrow.vector.complex.impl.UnionReader;
+import org.apache.arrow.vector.complex.impl.UnionWriter;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.BitWriter;
+import org.apache.arrow.vector.complex.writer.DateDayWriter;
+import org.apache.arrow.vector.complex.writer.DecimalWriter;
+import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.complex.writer.Float4Writer;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.complex.writer.IntervalDayWriter;
+import org.apache.arrow.vector.complex.writer.IntervalYearWriter;
+import org.apache.arrow.vector.complex.writer.SmallIntWriter;
+import org.apache.arrow.vector.complex.writer.TimeStampMilliWriter;
+import org.apache.arrow.vector.complex.writer.TinyIntWriter;
+import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+import org.apache.arrow.vector.holders.NullableBigIntHolder;
+import org.apache.arrow.vector.holders.NullableBitHolder;
+import org.apache.arrow.vector.holders.NullableDateDayHolder;
+import org.apache.arrow.vector.holders.NullableFloat4Holder;
+import org.apache.arrow.vector.holders.NullableFloat8Holder;
+import org.apache.arrow.vector.holders.NullableIntHolder;
+import org.apache.arrow.vector.holders.NullableIntervalDayHolder;
+import org.apache.arrow.vector.holders.NullableIntervalYearHolder;
+import org.apache.arrow.vector.holders.NullableSmallIntHolder;
+import org.apache.arrow.vector.holders.NullableTimeStampMilliHolder;
+import org.apache.arrow.vector.holders.NullableTinyIntHolder;
+import org.apache.arrow.vector.holders.NullableVarBinaryHolder;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TimestampLocalTZTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.lang.reflect.Method;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.IntConsumer;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
+import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo;
+import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
+
+/**
+ * ArrowColumnarBatchSerDe converts Apache Hive rows to Apache Arrow columns. Its serialized
+ * class is {@link ArrowWrapperWritable}, which doesn't support {@link
+ * Writable#readFields(DataInput)} and {@link Writable#write(DataOutput)}.
+ *
+ * Followings are known issues of current implementation.
+ *
+ * A list column cannot have a decimal column. {@link UnionListWriter} doesn't have an
+ * implementation for {@link BaseWriter.ListWriter#decimal()}.
+ *
+ * A union column can have only one of string, char, varchar fields at a same time. Apache Arrow
+ * doesn't have string and char, so {@link ArrowColumnarBatchSerDe} uses varchar to simulate
+ * string and char. They will be considered as a same data type in
+ * {@link org.apache.arrow.vector.complex.UnionVector}.
+ *
+ * Timestamp with local timezone is not supported. {@link VectorAssignRow} doesn't support it.
+ */
+public class ArrowColumnarBatchSerDe extends AbstractSerDe {
+ public static final Logger LOG = LoggerFactory.getLogger(ArrowColumnarBatchSerDe.class.getName());
+ private static final String DEFAULT_ARROW_FIELD_NAME = "[DEFAULT]";
+
+ private static final int MS_PER_SECOND = 1_000;
+ private static final int MS_PER_MINUTE = MS_PER_SECOND * 60;
+ private static final int MS_PER_HOUR = MS_PER_MINUTE * 60;
+ private static final int MS_PER_DAY = MS_PER_HOUR * 24;
+ private static final int NS_PER_MS = 1_000_000;
+
+ private BufferAllocator rootAllocator;
+
+ private StructTypeInfo rowTypeInfo;
+ private StructObjectInspector rowObjectInspector;
+ private Configuration conf;
+ private Serializer serializer;
+ private Deserializer deserializer;
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+ this.conf = conf;
+
+ rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
+
+ final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+ final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ final String columnNameDelimiter = tbl.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? tbl
+ .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA);
+
+ // Create an object inspector
+ final List<String> columnNames;
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList<>();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(columnNameDelimiter));
+ }
+ final List<TypeInfo> columnTypes;
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList<>();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+ rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+ rowObjectInspector =
+ (StructObjectInspector) getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo);
+
+ final List<Field> fields = new ArrayList<>();
+ final int size = columnNames.size();
+ for (int i = 0; i < size; i++) {
+ fields.add(toField(columnNames.get(i), columnTypes.get(i)));
+ }
+
+ serializer = new Serializer(new Schema(fields));
+ deserializer = new Deserializer();
+ }
+
+ private class Serializer {
+ private final int MAX_BUFFERED_ROWS;
+
+ // Schema
+ private final StructTypeInfo structTypeInfo;
+ private final List<TypeInfo> fieldTypeInfos;
+ private final int fieldSize;
+
+ // Hive columns
+ private final VectorizedRowBatch vectorizedRowBatch;
+ private final VectorAssignRow vectorAssignRow;
+ private int batchSize;
+
+ // Arrow columns
+ private final VectorSchemaRoot vectorSchemaRoot;
+ private final List<FieldVector> arrowVectors;
+ private final List<FieldWriter> fieldWriters;
+
+ private Serializer(Schema schema) throws SerDeException {
+ MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HIVE_ARROW_BATCH_SIZE);
+ LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS);
+
+ // Schema
+ structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(rowObjectInspector);
+ fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ fieldSize = fieldTypeInfos.size();
+
+ // Init Arrow stuffs
+ vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator);
+ arrowVectors = vectorSchemaRoot.getFieldVectors();
+ fieldWriters = Lists.newArrayList();
+ for (FieldVector fieldVector : arrowVectors) {
+ final FieldWriter fieldWriter =
+ Types.getMinorTypeForArrowType(
+ fieldVector.getField().getType()).getNewFieldWriter(fieldVector);
+ fieldWriters.add(fieldWriter);
+ }
+
+ // Init Hive stuffs
+ vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
+ for (int i = 0; i < fieldSize; i++) {
+ final ColumnVector columnVector = createColumnVector(fieldTypeInfos.get(i));
+ vectorizedRowBatch.cols[i] = columnVector;
+ columnVector.init();
+ }
+ vectorizedRowBatch.ensureSize(MAX_BUFFERED_ROWS);
+ vectorAssignRow = new VectorAssignRow();
+ try {
+ vectorAssignRow.init(rowObjectInspector);
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ private ArrowWrapperWritable serializeBatch() {
+ for (int i = 0; i < vectorizedRowBatch.projectionSize; i++) {
+ final int projectedColumn = vectorizedRowBatch.projectedColumns[i];
+ final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
+ final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(i);
+ final FieldWriter fieldWriter = fieldWriters.get(i);
+ final FieldVector arrowVector = arrowVectors.get(i);
+ arrowVector.setValueCount(0);
+ fieldWriter.setPosition(0);
+ write(fieldWriter, arrowVector, hiveVector, fieldTypeInfo, 0, batchSize, true);
+ }
+ vectorizedRowBatch.reset();
+ vectorSchemaRoot.setRowCount(batchSize);
+
+ batchSize = 0;
+ return new ArrowWrapperWritable(vectorSchemaRoot);
+ }
+
+ private BaseWriter getWriter(FieldWriter writer, TypeInfo typeInfo, String name) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case BOOLEAN:
+ return writer.bit(name);
+ case BYTE:
+ return writer.tinyInt(name);
+ case SHORT:
+ return writer.smallInt(name);
+ case INT:
+ return writer.integer(name);
+ case LONG:
+ return writer.bigInt(name);
+ case FLOAT:
+ return writer.float4(name);
+ case DOUBLE:
+ return writer.float8(name);
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ return writer.varChar(name);
+ case DATE:
+ return writer.dateDay(name);
+ case TIMESTAMP:
+ return writer.timeStampMilli(name);
+ case BINARY:
+ return writer.varBinary(name);
+ case DECIMAL:
+ final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+ final int scale = decimalTypeInfo.scale();
+ final int precision = decimalTypeInfo.precision();
+ return writer.decimal(name, scale, precision);
+ case INTERVAL_YEAR_MONTH:
+ return writer.intervalYear(name);
+ case INTERVAL_DAY_TIME:
+ return writer.intervalDay(name);
+ case TIMESTAMPLOCALTZ: // VectorAssignRow doesn't support it
+ case VOID:
+ case UNKNOWN:
+ default:
+ throw new IllegalArgumentException();
+ }
+ case LIST:
+ case UNION:
+ return writer.list(name);
+ case STRUCT:
+ return writer.map(name);
+ case MAP: // The caller will convert map to array<struct>
+ return writer.list(name).map();
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private BaseWriter getWriter(FieldWriter writer, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case BOOLEAN:
+ return writer.bit();
+ case BYTE:
+ return writer.tinyInt();
+ case SHORT:
+ return writer.smallInt();
+ case INT:
+ return writer.integer();
+ case LONG:
+ return writer.bigInt();
+ case FLOAT:
+ return writer.float4();
+ case DOUBLE:
+ return writer.float8();
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ return writer.varChar();
+ case DATE:
+ return writer.dateDay();
+ case TIMESTAMP:
+ return writer.timeStampMilli();
+ case BINARY:
+ return writer.varBinary();
+ case INTERVAL_YEAR_MONTH:
+ return writer.intervalDay();
+ case INTERVAL_DAY_TIME:
+ return writer.intervalYear();
+ case TIMESTAMPLOCALTZ: // VectorAssignRow doesn't support it
+ case DECIMAL: // ListVector doesn't support it
+ case VOID:
+ case UNKNOWN:
+ default:
+ throw new IllegalArgumentException();
+ }
+ case LIST:
+ case UNION:
+ return writer.list();
+ case STRUCT:
+ return writer.map();
+ case MAP: // The caller will convert map to array<struct>
+ return writer.list().map();
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private void write(BaseWriter baseWriter, FieldVector arrowVector, ColumnVector hiveVector,
+ TypeInfo typeInfo, int offset, int length, boolean incrementIndex) {
+
+ final IntConsumer writer;
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+ ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ writer = index -> ((BitWriter) baseWriter).writeBit(
+ (int) ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case BYTE:
+ writer = index ->
+ ((TinyIntWriter) baseWriter).writeTinyInt(
+ (byte) ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case SHORT:
+ writer = index -> ((SmallIntWriter) baseWriter).writeSmallInt(
+ (short) ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case INT:
+ writer = index -> ((IntWriter) baseWriter).writeInt(
+ (int) ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case LONG:
+ writer = index -> ((BigIntWriter) baseWriter).writeBigInt(
+ ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case FLOAT:
+ writer = index -> ((Float4Writer) baseWriter).writeFloat4(
+ (float) ((DoubleColumnVector) hiveVector).vector[index]);
+ break;
+ case DOUBLE:
+ writer = index -> ((Float8Writer) baseWriter).writeFloat8(
+ ((DoubleColumnVector) hiveVector).vector[index]);
+ break;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ writer = index -> {
+ BytesColumnVector stringVector = (BytesColumnVector) hiveVector;
+ byte[] bytes = stringVector.vector[index];
+ int start = stringVector.start[index];
+ int bytesLength = stringVector.length[index];
+ try (ArrowBuf arrowBuf = rootAllocator.buffer(bytesLength)) {
+ arrowBuf.setBytes(0, bytes, start, bytesLength);
+ ((VarCharWriter) baseWriter).writeVarChar(0, bytesLength, arrowBuf);
+ }
+ };
+ break;
+ case DATE:
+ writer = index -> ((DateDayWriter) baseWriter).writeDateDay(
+ (int) ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case TIMESTAMP:
+ writer = index -> ((TimeStampMilliWriter) baseWriter).writeTimeStampMilli(
+ ((TimestampColumnVector) hiveVector).getTime(index));
+ break;
+ case BINARY:
+ writer = index -> {
+ BytesColumnVector binaryVector = (BytesColumnVector) hiveVector;
+ final byte[] bytes = binaryVector.vector[index];
+ final int start = binaryVector.start[index];
+ final int byteLength = binaryVector.length[index];
+ try (ArrowBuf arrowBuf = rootAllocator.buffer(byteLength)) {
+ arrowBuf.setBytes(0, bytes, start, byteLength);
+ ((VarBinaryWriter) baseWriter).writeVarBinary(0, byteLength, arrowBuf);
+ }
+ };
+ break;
+ case DECIMAL:
+ writer = index -> {
+ DecimalColumnVector hiveDecimalVector = (DecimalColumnVector) hiveVector;
+ ((DecimalWriter) baseWriter).writeDecimal(
+ hiveDecimalVector.vector[index].getHiveDecimal().bigDecimalValue()
+ .setScale(hiveDecimalVector.scale));
+ };
+ break;
+ case INTERVAL_YEAR_MONTH:
+ writer = index -> ((IntervalYearWriter) baseWriter).writeIntervalYear(
+ (int) ((LongColumnVector) hiveVector).vector[index]);
+ break;
+ case INTERVAL_DAY_TIME:
+ writer = index -> {
+ IntervalDayTimeColumnVector intervalDayTimeVector =
+ (IntervalDayTimeColumnVector) hiveVector;
+ final long millis = (intervalDayTimeVector.getTotalSeconds(index) * 1_000) +
+ (intervalDayTimeVector.getNanos(index) / 1_000_000);
+ final int days = (int) (millis / MS_PER_DAY);
+ ((IntervalDayWriter) baseWriter).writeIntervalDay(
+ days, (int) (millis % MS_PER_DAY));
+ };
+ break;
+ case VOID:
+ case UNKNOWN:
+ case TIMESTAMPLOCALTZ:
+ default:
+ throw new IllegalArgumentException();
+ }
+ break;
+ case LIST:
+ final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+ final ListColumnVector hiveListVector = (ListColumnVector) hiveVector;
+ final ColumnVector hiveElementVector = hiveListVector.child;
+ final FieldVector arrowElementVector = arrowVector.getChildrenFromFields().get(0);
+ final BaseWriter.ListWriter listWriter = (BaseWriter.ListWriter) baseWriter;
+ final BaseWriter elementWriter = getWriter((FieldWriter) baseWriter, elementTypeInfo);
+
+ writer = index -> {
+ final int listOffset = (int) hiveListVector.offsets[index];
+ final int listLength = (int) hiveListVector.lengths[index];
+ listWriter.startList();
+ write(elementWriter, arrowElementVector, hiveElementVector, elementTypeInfo,
+ listOffset, listLength, false);
+ listWriter.endList();
+ };
+
+ incrementIndex = false;
+ break;
+ case STRUCT:
+ final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ final StructColumnVector hiveStructVector = (StructColumnVector) hiveVector;
+ final List<FieldVector> arrowFieldVectors = arrowVector.getChildrenFromFields();
+ final ColumnVector[] hiveFieldVectors = hiveStructVector.fields;
+ final BaseWriter.MapWriter structWriter = (BaseWriter.MapWriter) baseWriter;
+ final int fieldSize = fieldTypeInfos.size();
+
+ writer = index -> {
+ structWriter.start();
+ for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+ final ColumnVector hiveFieldVector = hiveFieldVectors[fieldIndex];
+ final BaseWriter fieldWriter = getWriter((FieldWriter) structWriter, fieldTypeInfo,
+ fieldName);
+ final FieldVector arrowFieldVector = arrowFieldVectors.get(fieldIndex);
+ write(fieldWriter, arrowFieldVector, hiveFieldVector, fieldTypeInfo, index, 1, false);
+ }
+ structWriter.end();
+ };
+
+ incrementIndex = false;
+ break;
+ case UNION:
+ final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+ final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
+ final ColumnVector[] hiveObjectVectors = hiveUnionVector.fields;
+ final UnionWriter unionWriter = (UnionWriter) baseWriter;
+
+ writer = index -> {
+ final int tag = hiveUnionVector.tags[index];
+ final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
+ final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
+ write(unionWriter, arrowVector, hiveObjectVector, objectTypeInfo, index, 1, false);
+ };
+ break;
+ case MAP:
+ final ListTypeInfo structListTypeInfo =
+ toStructListTypeInfo((MapTypeInfo) typeInfo);
+ final ListColumnVector structListVector =
+ toStructListVector((MapColumnVector) hiveVector);
+
+ writer = index -> write(baseWriter, arrowVector, structListVector, structListTypeInfo,
+ index, length, false);
+
+ incrementIndex = false;
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ if (hiveVector.noNulls) {
+ if (hiveVector.isRepeating) {
+ for (int i = 0; i < length; i++) {
+ writer.accept(0);
+ if (incrementIndex) {
+ baseWriter.setPosition(baseWriter.getPosition() + 1);
+ }
+ }
+ } else {
+ if (vectorizedRowBatch.selectedInUse) {
+ for (int j = 0; j < length; j++) {
+ final int i = vectorizedRowBatch.selected[j];
+ writer.accept(offset + i);
+ if (incrementIndex) {
+ baseWriter.setPosition(baseWriter.getPosition() + 1);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; i++) {
+ writer.accept(offset + i);
+ if (incrementIndex) {
+ baseWriter.setPosition(baseWriter.getPosition() + 1);
+ }
+ }
+ }
+ }
+ } else {
+ if (hiveVector.isRepeating) {
+ for (int i = 0; i < length; i++) {
+ if (hiveVector.isNull[0]) {
+ writeNull(baseWriter);
+ } else {
+ writer.accept(0);
+ }
+ if (incrementIndex) {
+ baseWriter.setPosition(baseWriter.getPosition() + 1);
+ }
+ }
+ } else {
+ if (vectorizedRowBatch.selectedInUse) {
+ for (int j = 0; j < length; j++) {
+ final int i = vectorizedRowBatch.selected[j];
+ if (hiveVector.isNull[offset + i]) {
+ writeNull(baseWriter);
+ } else {
+ writer.accept(offset + i);
+ }
+ if (incrementIndex) {
+ baseWriter.setPosition(baseWriter.getPosition() + 1);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; i++) {
+ if (hiveVector.isNull[offset + i]) {
+ writeNull(baseWriter);
+ } else {
+ writer.accept(offset + i);
+ }
+ if (incrementIndex) {
+ baseWriter.setPosition(baseWriter.getPosition() + 1);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) {
+ // if row is null, it means there are no more rows (closeOp()).
+ // another case can be that the buffer is full.
+ if (obj == null) {
+ return serializeBatch();
+ }
+ List<Object> standardObjects = new ArrayList<Object>();
+ ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
+ ((StructObjectInspector) objInspector), WRITABLE);
+
+ vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize);
+ batchSize++;
+ if (batchSize == MAX_BUFFERED_ROWS) {
+ return serializeBatch();
+ }
+ return null;
+ }
+ }
+
+ private static void writeNull(BaseWriter baseWriter) {
+ if (baseWriter instanceof UnionListWriter) {
+ // UnionListWriter should implement AbstractFieldWriter#writeNull
+ BaseWriter.ListWriter listWriter = ((UnionListWriter) baseWriter).list();
+ listWriter.setPosition(listWriter.getPosition() + 1);
+ } else {
+ // FieldWriter should have a super method of AbstractFieldWriter#writeNull
+ try {
+ Method method = baseWriter.getClass().getMethod("writeNull");
+ method.setAccessible(true);
+ method.invoke(baseWriter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static abstract class PrimitiveReader {
+ final void read(FieldReader reader, ColumnVector columnVector, int offset, int length) {
+ for (int i = 0; i < length; i++) {
+ final int rowIndex = offset + i;
+ if (reader.isSet()) {
+ doRead(reader, columnVector, rowIndex);
+ } else {
+ VectorizedBatchUtil.setNullColIsNullValue(columnVector, rowIndex);
+ }
+ reader.setPosition(reader.getPosition() + 1);
+ }
+ }
+
+ abstract void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex);
+ }
+
+ private class Deserializer {
+ private final VectorExtractRow vectorExtractRow;
+ private final VectorizedRowBatch vectorizedRowBatch;
+ private Object[][] rows;
+
+ public Deserializer() throws SerDeException {
+ vectorExtractRow = new VectorExtractRow();
+ final List<TypeInfo> fieldTypeInfoList = rowTypeInfo.getAllStructFieldTypeInfos();
+ final int fieldCount = fieldTypeInfoList.size();
+ final TypeInfo[] typeInfos = fieldTypeInfoList.toArray(new TypeInfo[fieldCount]);
+ try {
+ vectorExtractRow.init(typeInfos);
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+
+ vectorizedRowBatch = new VectorizedRowBatch(fieldCount);
+ for (int i = 0; i < fieldCount; i++) {
+ final ColumnVector columnVector = createColumnVector(typeInfos[i]);
+ columnVector.init();
+ vectorizedRowBatch.cols[i] = columnVector;
+ }
+ }
+
+ public Object deserialize(Writable writable) {
+ final ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) writable;
+ final VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot();
+ final List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
+ final int fieldCount = fieldVectors.size();
+ final int rowCount = vectorSchemaRoot.getRowCount();
+ vectorizedRowBatch.ensureSize(rowCount);
+
+ if (rows == null || rows.length < rowCount ) {
+ rows = new Object[rowCount][];
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ rows[rowIndex] = new Object[fieldCount];
+ }
+ }
+
+ for (int i = 0; i < fieldCount; i++) {
+ final FieldVector fieldVector = fieldVectors.get(i);
+ final FieldReader fieldReader = fieldVector.getReader();
+ fieldReader.setPosition(0);
+ final int projectedCol = vectorizedRowBatch.projectedColumns[i];
+ final ColumnVector columnVector = vectorizedRowBatch.cols[projectedCol];
+ final TypeInfo typeInfo = rowTypeInfo.getAllStructFieldTypeInfos().get(i);
+ read(fieldReader, columnVector, typeInfo, 0, rowCount);
+ }
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ vectorExtractRow.extractRow(vectorizedRowBatch, rowIndex, rows[rowIndex]);
+ }
+ vectorizedRowBatch.reset();
+ return rows;
+ }
+
+ private void read(FieldReader reader, ColumnVector columnVector, TypeInfo typeInfo,
+ int rowOffset, int rowLength) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+ ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+ final PrimitiveReader primitiveReader;
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ primitiveReader = new PrimitiveReader() {
+ NullableBitHolder holder = new NullableBitHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case BYTE:
+ primitiveReader = new PrimitiveReader() {
+ NullableTinyIntHolder holder = new NullableTinyIntHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case SHORT:
+ primitiveReader = new PrimitiveReader() {
+ NullableSmallIntHolder holder = new NullableSmallIntHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case INT:
+ primitiveReader = new PrimitiveReader() {
+ NullableIntHolder holder = new NullableIntHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case LONG:
+ primitiveReader = new PrimitiveReader() {
+ NullableBigIntHolder holder = new NullableBigIntHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case FLOAT:
+ primitiveReader = new PrimitiveReader() {
+ NullableFloat4Holder holder = new NullableFloat4Holder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((DoubleColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case DOUBLE:
+ primitiveReader = new PrimitiveReader() {
+ NullableFloat8Holder holder = new NullableFloat8Holder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((DoubleColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ primitiveReader = new PrimitiveReader() {
+ NullableVarCharHolder holder = new NullableVarCharHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ int varCharSize = holder.end - holder.start;
+ byte[] varCharBytes = new byte[varCharSize];
+ holder.buffer.getBytes(holder.start, varCharBytes);
+ ((BytesColumnVector) columnVector).setVal(rowIndex, varCharBytes, 0, varCharSize);
+ }
+ };
+ break;
+ case DATE:
+ primitiveReader = new PrimitiveReader() {
+ NullableDateDayHolder holder = new NullableDateDayHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case TIMESTAMP:
+ primitiveReader = new PrimitiveReader() {
+ NullableTimeStampMilliHolder timeStampMilliHolder =
+ new NullableTimeStampMilliHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(timeStampMilliHolder);
+ ((TimestampColumnVector) columnVector).set(rowIndex,
+ new Timestamp(timeStampMilliHolder.value));
+ }
+ };
+ break;
+ case BINARY:
+ primitiveReader = new PrimitiveReader() {
+ NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ final int binarySize = holder.end - holder.start;
+ final byte[] binaryBytes = new byte[binarySize];
+ holder.buffer.getBytes(holder.start, binaryBytes);
+ ((BytesColumnVector) columnVector).setVal(rowIndex, binaryBytes, 0, binarySize);
+ }
+ };
+ break;
+ case DECIMAL:
+ primitiveReader = new PrimitiveReader() {
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ ((DecimalColumnVector) columnVector).set(rowIndex,
+ HiveDecimal.create(reader.readBigDecimal()));
+ }
+ };
+ break;
+ case INTERVAL_YEAR_MONTH:
+ primitiveReader = new PrimitiveReader() {
+ NullableIntervalYearHolder holder = new NullableIntervalYearHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ reader.read(holder);
+ ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
+ }
+ };
+ break;
+ case INTERVAL_DAY_TIME:
+ primitiveReader = new PrimitiveReader() {
+ NullableIntervalDayHolder holder = new NullableIntervalDayHolder();
+
+ @Override
+ void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
+ IntervalDayTimeColumnVector intervalDayTimeVector =
+ (IntervalDayTimeColumnVector) columnVector;
+ reader.read(holder);
+ HiveIntervalDayTime intervalDayTime = new HiveIntervalDayTime(
+ holder.days, // days
+ holder.milliseconds / MS_PER_HOUR, // hour
+ (holder.milliseconds % MS_PER_HOUR) / MS_PER_MINUTE, // minute
+ (holder.milliseconds % MS_PER_MINUTE) / MS_PER_SECOND, // second
+ (holder.milliseconds % MS_PER_SECOND) * NS_PER_MS); // nanosecond
+ intervalDayTimeVector.set(rowIndex, intervalDayTime);
+ }
+ };
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ primitiveReader.read(reader, columnVector, rowOffset, rowLength);
+ break;
+ case LIST:
+ final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+ final ListColumnVector listVector = (ListColumnVector) columnVector;
+ final ColumnVector elementVector = listVector.child;
+ final FieldReader elementReader = reader.reader();
+
+ int listOffset = 0;
+ for (int rowIndex = 0; rowIndex < rowLength; rowIndex++) {
+ final int adjustedRowIndex = rowOffset + rowIndex;
+ reader.setPosition(adjustedRowIndex);
+ final int listLength = reader.size();
+ listVector.offsets[adjustedRowIndex] = listOffset;
+ listVector.lengths[adjustedRowIndex] = listLength;
+ read(elementReader, elementVector, elementTypeInfo, listOffset, listLength);
+ listOffset += listLength;
+ }
+ break;
+ case STRUCT:
+ final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ final List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+ final int fieldSize = fieldNames.size();
+ final StructColumnVector structVector = (StructColumnVector) columnVector;
+ final ColumnVector[] fieldVectors = structVector.fields;
+
+ for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final FieldReader fieldReader = reader.reader(fieldNames.get(fieldIndex));
+ final ColumnVector fieldVector = fieldVectors[fieldIndex];
+ read(fieldReader, fieldVector, fieldTypeInfo, rowOffset, rowLength);
+ }
+ break;
+ case UNION:
+ final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+ final UnionColumnVector unionVector = (UnionColumnVector) columnVector;
+ final ColumnVector[] objectVectors = unionVector.fields;
+ final Map<Types.MinorType, Integer> minorTypeToTagMap = Maps.newHashMap();
+ for (int tag = 0; tag < objectTypeInfos.size(); tag++) {
+ minorTypeToTagMap.put(toMinorType(objectTypeInfos.get(tag)), tag);
+ }
+
+ final UnionReader unionReader = (UnionReader) reader;
+ for (int rowIndex = 0; rowIndex < rowLength; rowIndex++) {
+ final int adjustedRowIndex = rowIndex + rowOffset;
+ unionReader.setPosition(adjustedRowIndex);
+ final Types.MinorType minorType = unionReader.getMinorType();
+ final int tag = minorTypeToTagMap.get(minorType);
+ unionVector.tags[adjustedRowIndex] = tag;
+ read(unionReader, objectVectors[tag], objectTypeInfos.get(tag), adjustedRowIndex, 1);
+ }
+ break;
+ case MAP:
+ final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+ final ListTypeInfo mapStructListTypeInfo = toStructListTypeInfo(mapTypeInfo);
+ final MapColumnVector hiveMapVector = (MapColumnVector) columnVector;
+ final ListColumnVector mapStructListVector = toStructListVector(hiveMapVector);
+ final StructColumnVector mapStructVector = (StructColumnVector) mapStructListVector.child;
+ read(reader, mapStructListVector, mapStructListTypeInfo, rowOffset, rowLength);
+
+ hiveMapVector.isRepeating = mapStructListVector.isRepeating;
+ hiveMapVector.childCount = mapStructListVector.childCount;
+ hiveMapVector.noNulls = mapStructListVector.noNulls;
+ System.arraycopy(mapStructListVector.offsets, 0, hiveMapVector.offsets, 0, rowLength);
+ System.arraycopy(mapStructListVector.lengths, 0, hiveMapVector.lengths, 0, rowLength);
+ hiveMapVector.keys = mapStructVector.fields[0];
+ hiveMapVector.values = mapStructVector.fields[1];
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+
+ private static Types.MinorType toMinorType(TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case BOOLEAN:
+ return Types.MinorType.BIT;
+ case BYTE:
+ return Types.MinorType.TINYINT;
+ case SHORT:
+ return Types.MinorType.SMALLINT;
+ case INT:
+ return Types.MinorType.INT;
+ case LONG:
+ return Types.MinorType.BIGINT;
+ case FLOAT:
+ return Types.MinorType.FLOAT4;
+ case DOUBLE:
+ return Types.MinorType.FLOAT8;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ return Types.MinorType.VARCHAR;
+ case DATE:
+ return Types.MinorType.DATEDAY;
+ case TIMESTAMP:
+ return Types.MinorType.TIMESTAMPMILLI;
+ case BINARY:
+ return Types.MinorType.VARBINARY;
+ case DECIMAL:
+ return Types.MinorType.DECIMAL;
+ case INTERVAL_YEAR_MONTH:
+ return Types.MinorType.INTERVALYEAR;
+ case INTERVAL_DAY_TIME:
+ return Types.MinorType.INTERVALDAY;
+ case VOID:
+ case TIMESTAMPLOCALTZ:
+ case UNKNOWN:
+ default:
+ throw new IllegalArgumentException();
+ }
+ case LIST:
+ return Types.MinorType.LIST;
+ case STRUCT:
+ return Types.MinorType.MAP;
+ case UNION:
+ return Types.MinorType.UNION;
+ case MAP:
+ // Apache Arrow doesn't have a map vector, so it's converted to a list vector of a struct
+ // vector.
+ return Types.MinorType.LIST;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private static ListTypeInfo toStructListTypeInfo(MapTypeInfo mapTypeInfo) {
+ final StructTypeInfo structTypeInfo = new StructTypeInfo();
+ structTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", "values"));
+ structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(
+ mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo()));
+ final ListTypeInfo structListTypeInfo = new ListTypeInfo();
+ structListTypeInfo.setListElementTypeInfo(structTypeInfo);
+ return structListTypeInfo;
+ }
+
+ private static Field toField(String name, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ return Field.nullable(name, Types.MinorType.BIT.getType());
+ case BYTE:
+ return Field.nullable(name, Types.MinorType.TINYINT.getType());
+ case SHORT:
+ return Field.nullable(name, Types.MinorType.SMALLINT.getType());
+ case INT:
+ return Field.nullable(name, Types.MinorType.INT.getType());
+ case LONG:
+ return Field.nullable(name, Types.MinorType.BIGINT.getType());
+ case FLOAT:
+ return Field.nullable(name, Types.MinorType.FLOAT4.getType());
+ case DOUBLE:
+ return Field.nullable(name, Types.MinorType.FLOAT8.getType());
+ case STRING:
+ return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+ case DATE:
+ return Field.nullable(name, Types.MinorType.DATEDAY.getType());
+ case TIMESTAMP:
+ return Field.nullable(name, Types.MinorType.TIMESTAMPMILLI.getType());
+ case TIMESTAMPLOCALTZ:
+ final TimestampLocalTZTypeInfo timestampLocalTZTypeInfo =
+ (TimestampLocalTZTypeInfo) typeInfo;
+ final String timeZone = timestampLocalTZTypeInfo.getTimeZone().toString();
+ return Field.nullable(name, new ArrowType.Timestamp(TimeUnit.MILLISECOND, timeZone));
+ case BINARY:
+ return Field.nullable(name, Types.MinorType.VARBINARY.getType());
+ case DECIMAL:
+ final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+ final int precision = decimalTypeInfo.precision();
+ final int scale = decimalTypeInfo.scale();
+ return Field.nullable(name, new ArrowType.Decimal(precision, scale));
+ case VARCHAR:
+ return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+ case CHAR:
+ return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+ case INTERVAL_YEAR_MONTH:
+ return Field.nullable(name, Types.MinorType.INTERVALYEAR.getType());
+ case INTERVAL_DAY_TIME:
+ return Field.nullable(name, Types.MinorType.INTERVALDAY.getType());
+ default:
+ throw new IllegalArgumentException();
+ }
+ case LIST:
+ final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
+ return new Field(name, FieldType.nullable(Types.MinorType.LIST.getType()),
+ Lists.newArrayList(toField(DEFAULT_ARROW_FIELD_NAME, elementTypeInfo)));
+ case STRUCT:
+ final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ final List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
+ final List<Field> structFields = Lists.newArrayList();
+ final int structSize = fieldNames.size();
+ for (int i = 0; i < structSize; i++) {
+ structFields.add(toField(fieldNames.get(i), fieldTypeInfos.get(i)));
+ }
+ return new Field(name, FieldType.nullable(Types.MinorType.MAP.getType()), structFields);
+ case UNION:
+ final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+ final List<Field> unionFields = Lists.newArrayList();
+ final int unionSize = unionFields.size();
+ for (int i = 0; i < unionSize; i++) {
+ unionFields.add(toField(DEFAULT_ARROW_FIELD_NAME, objectTypeInfos.get(i)));
+ }
+ return new Field(name, FieldType.nullable(Types.MinorType.UNION.getType()), unionFields);
+ case MAP:
+ final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+ final TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
+ final TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
+
+ final StructTypeInfo mapStructTypeInfo = new StructTypeInfo();
+ mapStructTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", "values"));
+ mapStructTypeInfo.setAllStructFieldTypeInfos(
+ Lists.newArrayList(keyTypeInfo, valueTypeInfo));
+
+ final ListTypeInfo mapListStructTypeInfo = new ListTypeInfo();
+ mapListStructTypeInfo.setListElementTypeInfo(mapStructTypeInfo);
+
+ return toField(name, mapListStructTypeInfo);
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private static ListColumnVector toStructListVector(MapColumnVector mapVector) {
+ final StructColumnVector structVector;
+ final ListColumnVector structListVector;
+ structVector = new StructColumnVector();
+ structVector.fields = new ColumnVector[] {mapVector.keys, mapVector.values};
+ structListVector = new ListColumnVector();
+ structListVector.child = structVector;
+ System.arraycopy(mapVector.offsets, 0, structListVector.offsets, 0, mapVector.childCount);
+ System.arraycopy(mapVector.lengths, 0, structListVector.lengths, 0, mapVector.childCount);
+ structListVector.childCount = mapVector.childCount;
+ structListVector.isRepeating = mapVector.isRepeating;
+ structListVector.noNulls = mapVector.noNulls;
+ return structListVector;
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return ArrowWrapperWritable.class;
+ }
+
+ @Override
+ public ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) {
+ return serializer.serialize(obj, objInspector);
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+
+ @Override
+ public Object deserialize(Writable writable) {
+ return deserializer.deserialize(writable);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ return rowObjectInspector;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
new file mode 100644
index 0000000..df7b53f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ArrowWrapperWritable implements Writable {
+ private VectorSchemaRoot vectorSchemaRoot;
+
+ public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
+ this.vectorSchemaRoot = vectorSchemaRoot;
+ }
+
+ public VectorSchemaRoot getVectorSchemaRoot() {
+ return vectorSchemaRoot;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
new file mode 100644
index 0000000..78cc188
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_ROOT_ALLOCATOR_LIMIT;
+
+/**
+ * Thread-safe singleton factory for RootAllocator
+ */
+public enum RootAllocatorFactory {
+ INSTANCE;
+
+ private RootAllocator rootAllocator;
+
+ RootAllocatorFactory() {
+ }
+
+ public synchronized RootAllocator getRootAllocator(Configuration conf) {
+ if (rootAllocator == null) {
+ final long limit = HiveConf.getLongVar(conf, HIVE_ARROW_ROOT_ALLOCATOR_LIMIT);
+ rootAllocator = new RootAllocator(limit);
+ }
+ return rootAllocator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
new file mode 100644
index 0000000..bcb7a88
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
+import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestArrowColumnarBatchSerDe {
+ private Configuration conf;
+
+ private final static Object[][] INTEGER_ROWS = {
+ {byteW(0), shortW(0), intW(0), longW(0)},
+ {byteW(1), shortW(1), intW(1), longW(1)},
+ {byteW(-1), shortW(-1), intW(-1), longW(-1)},
+ {byteW(Byte.MIN_VALUE), shortW(Short.MIN_VALUE), intW(Integer.MIN_VALUE),
+ longW(Long.MIN_VALUE)},
+ {byteW(Byte.MAX_VALUE), shortW(Short.MAX_VALUE), intW(Integer.MAX_VALUE),
+ longW(Long.MAX_VALUE)},
+ {null, null, null, null},
+ };
+
+ private final static Object[][] FLOAT_ROWS = {
+ {floatW(0f), doubleW(0d)},
+ {floatW(1f), doubleW(1d)},
+ {floatW(-1f), doubleW(-1d)},
+ {floatW(Float.MIN_VALUE), doubleW(Double.MIN_VALUE)},
+ {floatW(-Float.MIN_VALUE), doubleW(-Double.MIN_VALUE)},
+ {floatW(Float.MAX_VALUE), doubleW(Double.MAX_VALUE)},
+ {floatW(-Float.MAX_VALUE), doubleW(-Double.MAX_VALUE)},
+ {floatW(Float.POSITIVE_INFINITY), doubleW(Double.POSITIVE_INFINITY)},
+ {floatW(Float.NEGATIVE_INFINITY), doubleW(Double.NEGATIVE_INFINITY)},
+ {null, null},
+ };
+
+ private final static Object[][] STRING_ROWS = {
+ {text(""), charW("", 10), varcharW("", 10)},
+ {text("Hello"), charW("Hello", 10), varcharW("Hello", 10)},
+ {text("world!"), charW("world!", 10), varcharW("world!", 10)},
+ {null, null, null},
+ };
+
+ private final static long NOW = System.currentTimeMillis();
+ private final static Object[][] DTI_ROWS = {
+ {
+ new DateWritable(DateWritable.millisToDays(NOW)),
+ new TimestampWritable(new Timestamp(NOW)),
+ new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)),
+ new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 5_000_000))
+ },
+ {null, null, null, null},
+ };
+
+ private final static Object[][] DECIMAL_ROWS = {
+ {decimalW(HiveDecimal.ZERO)},
+ {decimalW(HiveDecimal.ONE)},
+ {decimalW(HiveDecimal.ONE.negate())},
+ {decimalW(HiveDecimal.create("0.000001"))},
+ {decimalW(HiveDecimal.create("100000"))},
+ {null},
+ };
+
+ private final static Object[][] BOOLEAN_ROWS = {
+ {new BooleanWritable(true)},
+ {new BooleanWritable(false)},
+ {null},
+ };
+
+ private final static Object[][] BINARY_ROWS = {
+ {new BytesWritable("".getBytes())},
+ {new BytesWritable("Hello".getBytes())},
+ {new BytesWritable("world!".getBytes())},
+ {null},
+ };
+
+ @Before
+ public void setUp() {
+ conf = new Configuration();
+ }
+
+ private static ByteWritable byteW(int value) {
+ return new ByteWritable((byte) value);
+ }
+
+ private static ShortWritable shortW(int value) {
+ return new ShortWritable((short) value);
+ }
+
+ private static IntWritable intW(int value) {
+ return new IntWritable(value);
+ }
+
+ private static LongWritable longW(long value) {
+ return new LongWritable(value);
+ }
+
+ private static FloatWritable floatW(float value) {
+ return new FloatWritable(value);
+ }
+
+ private static DoubleWritable doubleW(double value) {
+ return new DoubleWritable(value);
+ }
+
+ private static Text text(String value) {
+ return new Text(value);
+ }
+
+ private static HiveCharWritable charW(String value, int length) {
+ return new HiveCharWritable(new HiveChar(value, length));
+ }
+
+ private static HiveVarcharWritable varcharW(String value, int length) {
+ return new HiveVarcharWritable(new HiveVarchar(value, length));
+ }
+
+ private static HiveDecimalWritable decimalW(HiveDecimal value) {
+ return new HiveDecimalWritable(value);
+ }
+
+ private void initAndSerializeAndDeserialize(String[][] schema, Object[][] rows) throws SerDeException {
+ AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+ StructObjectInspector rowOI = initSerDe(serDe, schema);
+ serializeAndDeserialize(serDe, rows, rowOI);
+ }
+
+ private StructObjectInspector initSerDe(AbstractSerDe serDe, String[][] schema)
+ throws SerDeException {
+ List<String> fieldNameList = newArrayList();
+ List<String> fieldTypeList = newArrayList();
+ List<TypeInfo> typeInfoList = newArrayList();
+
+ for (String[] nameAndType : schema) {
+ String name = nameAndType[0];
+ String type = nameAndType[1];
+ fieldNameList.add(name);
+ fieldTypeList.add(type);
+ typeInfoList.add(TypeInfoUtils.getTypeInfoFromTypeString(type));
+ }
+
+ String fieldNames = Joiner.on(',').join(fieldNameList);
+ String fieldTypes = Joiner.on(',').join(fieldTypeList);
+
+ Properties schemaProperties = new Properties();
+ schemaProperties.setProperty(serdeConstants.LIST_COLUMNS, fieldNames);
+ schemaProperties.setProperty(serdeConstants.LIST_COLUMN_TYPES, fieldTypes);
+ SerDeUtils.initializeSerDe(serDe, conf, schemaProperties, null);
+ return (StructObjectInspector) TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
+ TypeInfoFactory.getStructTypeInfo(fieldNameList, typeInfoList));
+ }
+
+ private void serializeAndDeserialize(AbstractSerDe serDe, Object[][] rows,
+ StructObjectInspector rowOI) throws SerDeException {
+ Writable serialized = null;
+ for (Object[] row : rows) {
+ serialized = serDe.serialize(row, rowOI);
+ }
+ // Pass null to complete a batch
+ if (serialized == null) {
+ serialized = serDe.serialize(null, rowOI);
+ }
+ final Object[][] deserializedRows = (Object[][]) serDe.deserialize(serialized);
+
+ for (int rowIndex = 0; rowIndex < Math.min(deserializedRows.length, rows.length); rowIndex++) {
+ final Object[] row = rows[rowIndex];
+ final Object[] deserializedRow = deserializedRows[rowIndex];
+ assertEquals(row.length, deserializedRow.length);
+
+ final List<? extends StructField> fields = rowOI.getAllStructFieldRefs();
+ for (int fieldIndex = 0; fieldIndex < fields.size(); fieldIndex++) {
+ final StructField field = fields.get(fieldIndex);
+ final ObjectInspector fieldObjInspector = field.getFieldObjectInspector();
+ switch (fieldObjInspector.getCategory()) {
+ case PRIMITIVE:
+ final PrimitiveObjectInspector primitiveObjInspector =
+ (PrimitiveObjectInspector) fieldObjInspector;
+ switch (primitiveObjInspector.getPrimitiveCategory()) {
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ assertEquals(Objects.toString(row[fieldIndex]),
+ Objects.toString(deserializedRow[fieldIndex]));
+ break;
+ default:
+ assertEquals(row[fieldIndex], deserializedRow[fieldIndex]);
+ break;
+ }
+ break;
+ case STRUCT:
+ final Object[] rowStruct = (Object[]) row[fieldIndex];
+ final List deserializedRowStruct = (List) deserializedRow[fieldIndex];
+ assertArrayEquals(rowStruct, deserializedRowStruct.toArray());
+ break;
+ case LIST:
+ case UNION:
+ assertEquals(row[fieldIndex], deserializedRow[fieldIndex]);
+ break;
+ case MAP:
+ Map rowMap = (Map) row[fieldIndex];
+ Map deserializedRowMap = (Map) deserializedRow[fieldIndex];
+ Set rowMapKeySet = rowMap.keySet();
+ Set deserializedRowMapKeySet = deserializedRowMap.keySet();
+ assertTrue(rowMapKeySet.containsAll(deserializedRowMapKeySet));
+ assertTrue(deserializedRowMapKeySet.containsAll(rowMapKeySet));
+ for (Object key : rowMapKeySet) {
+ assertEquals(rowMap.get(key), deserializedRowMap.get(key));
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testComprehensive() throws SerDeException {
+ String[][] schema = {
+ {"datatypes.c1", "int"},
+ {"datatypes.c2", "boolean"},
+ {"datatypes.c3", "double"},
+ {"datatypes.c4", "string"},
+ {"datatypes.c5", "array<int>"},
+ {"datatypes.c6", "map<int,string>"},
+ {"datatypes.c7", "map<string,string>"},
+ {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+ {"datatypes.c9", "tinyint"},
+ {"datatypes.c10", "smallint"},
+ {"datatypes.c11", "float"},
+ {"datatypes.c12", "bigint"},
+ {"datatypes.c13", "array<array<string>>"},
+ {"datatypes.c14", "map<int,map<int,int>>"},
+ {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+ {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+ {"datatypes.c17", "timestamp"},
+ {"datatypes.c18", "decimal(16,7)"},
+ {"datatypes.c19", "binary"},
+ {"datatypes.c20", "date"},
+ {"datatypes.c21", "varchar(20)"},
+ {"datatypes.c22", "char(15)"},
+ {"datatypes.c23", "binary"},
+ };
+
+ Object[][] comprehensiveRows = {
+ {
+ intW(0), // c1:int
+ new BooleanWritable(false), // c2:boolean
+ doubleW(0), // c3:double
+ text("Hello"), // c4:string
+ newArrayList(intW(0), intW(1), intW(2)), // c5:array<int>
+ Maps.toMap(
+ newArrayList(intW(0), intW(1), intW(2)),
+ input -> text("Number " + input)), // c6:map<int,string>
+ Maps.toMap(
+ newArrayList(text("apple"), text("banana"), text("carrot")),
+ input -> text(input.toString().toUpperCase())), // c7:map<string,string>
+ new Object[] {text("0"), intW(1), doubleW(2)}, // c8:struct<r:string,s:int,t:double>
+ byteW(0), // c9:tinyint
+ shortW(0), // c10:smallint
+ floatW(0), // c11:float
+ longW(0), // c12:bigint
+ newArrayList(
+ newArrayList(text("a"), text("b"), text("c")),
+ newArrayList(text("A"), text("B"), text("C"))), // c13:array<array<string>>
+ Maps.toMap(
+ newArrayList(intW(0), intW(1), intW(2)),
+ x -> Maps.toMap(
+ newArrayList(x, intW(x.get() * 2)),
+ y -> y)), // c14:map<int,map<int,int>>
+ new Object[] {
+ intW(0),
+ newArrayList(
+ intW(1),
+ text("Hello"))}, // c15:struct<r:int,s:struct<a:int,b:string>>
+ Collections.singletonList(
+ newArrayList(
+ Maps.toMap(
+ newArrayList(text("hello")),
+ input -> text(input.toString().toUpperCase())),
+ intW(0))), // c16:array<struct<m:map<string,string>,n:int>>
+ new TimestampWritable(new Timestamp(NOW)), // c17:timestamp
+ decimalW(HiveDecimal.create(0, 0)), // c18:decimal(16,7)
+ new BytesWritable("Hello".getBytes()), // c19:binary
+ new DateWritable(123), // c20:date
+ varcharW("x", 20), // c21:varchar(20)
+ charW("y", 15), // c22:char(15)
+ new BytesWritable("world!".getBytes()), // c23:binary
+ },
+ };
+
+ initAndSerializeAndDeserialize(schema, comprehensiveRows);
+ }
+
+ private <E> List<E> newArrayList(E ... elements) {
+ return Lists.newArrayList(elements);
+ }
+
+ @Test
+ public void testPrimitiveInteger() throws SerDeException {
+ String[][] schema = {
+ {"tinyint1", "tinyint"},
+ {"smallint1", "smallint"},
+ {"int1", "int"},
+ {"bigint1", "bigint"}
+ };
+
+ initAndSerializeAndDeserialize(schema, INTEGER_ROWS);
+ }
+
+ @Test
+ public void testPrimitiveBigInt10000() throws SerDeException {
+ String[][] schema = {
+ {"bigint1", "bigint"}
+ };
+
+ final int batchSize = 1000;
+ final Object[][] integerRows = new Object[batchSize][];
+ final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+ StructObjectInspector rowOI = initSerDe(serDe, schema);
+
+ for (int j = 0; j < 10; j++) {
+ for (int i = 0; i < batchSize; i++) {
+ integerRows[i] = new Object[] {longW(i + j * batchSize)};
+ }
+
+ serializeAndDeserialize(serDe, integerRows, rowOI);
+ }
+ }
+
+ @Test
+ public void testPrimitiveBigIntRandom() {
+ try {
+ String[][] schema = {
+ {"bigint1", "bigint"}
+ };
+
+ final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+ StructObjectInspector rowOI = initSerDe(serDe, schema);
+
+ final Random random = new Random();
+ for (int j = 0; j < 1000; j++) {
+ final int batchSize = random.nextInt(1000);
+ final Object[][] integerRows = new Object[batchSize][];
+ for (int i = 0; i < batchSize; i++) {
+ integerRows[i] = new Object[] {longW(random.nextLong())};
+ }
+
+ serializeAndDeserialize(serDe, integerRows, rowOI);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testPrimitiveFloat() throws SerDeException {
+ String[][] schema = {
+ {"float1", "float"},
+ {"double1", "double"},
+ };
+
+ initAndSerializeAndDeserialize(schema, FLOAT_ROWS);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testPrimitiveFloatNaN() throws SerDeException {
+ String[][] schema = {
+ {"float1", "float"},
+ };
+
+ Object[][] rows = {{new FloatWritable(Float.NaN)}};
+
+ initAndSerializeAndDeserialize(schema, rows);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testPrimitiveDoubleNaN() throws SerDeException {
+ String[][] schema = {
+ {"double1", "double"},
+ };
+
+ Object[][] rows = {{new DoubleWritable(Double.NaN)}};
+
+ initAndSerializeAndDeserialize(schema, rows);
+ }
+
+ @Test
+ public void testPrimitiveString() throws SerDeException {
+ String[][] schema = {
+ {"string1", "string"},
+ {"char1", "char(10)"},
+ {"varchar1", "varchar(10)"},
+ };
+
+ initAndSerializeAndDeserialize(schema, STRING_ROWS);
+ }
+
+ @Test
+ public void testPrimitiveDTI() throws SerDeException {
+ String[][] schema = {
+ {"date1", "date"},
+ {"timestamp1", "timestamp"},
+ {"interval_year_month1", "interval_year_month"},
+ {"interval_day_time1", "interval_day_time"},
+ };
+
+ initAndSerializeAndDeserialize(schema, DTI_ROWS);
+ }
+
+ @Test
+ public void testPrimitiveDecimal() throws SerDeException {
+ String[][] schema = {
+ {"decimal1", "decimal(38,10)"},
+ };
+
+ initAndSerializeAndDeserialize(schema, DECIMAL_ROWS);
+ }
+
+ @Test
+ public void testPrimitiveBoolean() throws SerDeException {
+ String[][] schema = {
+ {"boolean1", "boolean"},
+ };
+
+ initAndSerializeAndDeserialize(schema, BOOLEAN_ROWS);
+ }
+
+ @Test
+ public void testPrimitiveBinary() throws SerDeException {
+ String[][] schema = {
+ {"binary1", "binary"},
+ };
+
+ initAndSerializeAndDeserialize(schema, BINARY_ROWS);
+ }
+
+ private List[][] toList(Object[][] rows) {
+ List[][] array = new List[rows.length][];
+ for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+ Object[] row = rows[rowIndex];
+ array[rowIndex] = new List[row.length];
+ for (int fieldIndex = 0; fieldIndex < row.length; fieldIndex++) {
+ array[rowIndex][fieldIndex] = newArrayList(row[fieldIndex]);
+ }
+ }
+ return array;
+ }
+
+ @Test
+ public void testListInteger() throws SerDeException {
+ String[][] schema = {
+ {"tinyint_list", "array<tinyint>"},
+ {"smallint_list", "array<smallint>"},
+ {"int_list", "array<int>"},
+ {"bigint_list", "array<bigint>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(INTEGER_ROWS));
+ }
+
+ @Test
+ public void testListFloat() throws SerDeException {
+ String[][] schema = {
+ {"float_list", "array<float>"},
+ {"double_list", "array<double>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(FLOAT_ROWS));
+ }
+
+ @Test
+ public void testListString() throws SerDeException {
+ String[][] schema = {
+ {"string_list", "array<string>"},
+ {"char_list", "array<char(10)>"},
+ {"varchar_list", "array<varchar(10)>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(STRING_ROWS));
+ }
+
+ @Test
+ public void testListDTI() throws SerDeException {
+ String[][] schema = {
+ {"date_list", "array<date>"},
+ {"timestamp_list", "array<timestamp>"},
+ {"interval_year_month_list", "array<interval_year_month>"},
+ {"interval_day_time_list", "array<interval_day_time>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(DTI_ROWS));
+ }
+
+ @Test
+ public void testListBoolean() throws SerDeException {
+ String[][] schema = {
+ {"boolean_list", "array<boolean>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(BOOLEAN_ROWS));
+ }
+
+ @Test
+ public void testListBinary() throws SerDeException {
+ String[][] schema = {
+ {"binary_list", "array<binary>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(BINARY_ROWS));
+ }
+
+ private StandardUnionObjectInspector.StandardUnion union(int tag, Object object) {
+ return new StandardUnionObjectInspector.StandardUnion((byte) tag, object);
+ }
+
+ public void testUnionInteger() throws SerDeException {
+ String[][] schema = {
+ {"int_union", "uniontype<tinyint,smallint,int,bigint>"},
+ };
+
+ StandardUnionObjectInspector.StandardUnion[][] integerUnions = {
+ {union(0, byteW(0))},
+ {union(1, shortW(1))},
+ {union(2, intW(2))},
+ {union(3, longW(3))},
+ };
+
+ initAndSerializeAndDeserialize(schema, integerUnions);
+ }
+
+ public void testUnionFloat() throws SerDeException {
+ String[][] schema = {
+ {"float_union", "uniontype<float,double>"},
+ };
+
+ StandardUnionObjectInspector.StandardUnion[][] floatUnions = {
+ {union(0, floatW(0f))},
+ {union(1, doubleW(1d))},
+ };
+
+ initAndSerializeAndDeserialize(schema, floatUnions);
+ }
+
+ public void testUnionString() throws SerDeException {
+ String[][] schema = {
+ {"string_union", "uniontype<string,int>"},
+ };
+
+ StandardUnionObjectInspector.StandardUnion[][] stringUnions = {
+ {union(0, text("Hello"))},
+ {union(1, intW(1))},
+ };
+
+ initAndSerializeAndDeserialize(schema, stringUnions);
+ }
+
+ public void testUnionChar() throws SerDeException {
+ String[][] schema = {
+ {"char_union", "uniontype<char(10),int>"},
+ };
+
+ StandardUnionObjectInspector.StandardUnion[][] charUnions = {
+ {union(0, charW("Hello", 10))},
+ {union(1, intW(1))},
+ };
+
+ initAndSerializeAndDeserialize(schema, charUnions);
+ }
+
+ public void testUnionVarchar() throws SerDeException {
+ String[][] schema = {
+ {"varchar_union", "uniontype<varchar(10),int>"},
+ };
+
+ StandardUnionObjectInspector.StandardUnion[][] varcharUnions = {
+ {union(0, varcharW("Hello", 10))},
+ {union(1, intW(1))},
+ };
+
+ initAndSerializeAndDeserialize(schema, varcharUnions);
+ }
+
+ public void testUnionDTI() throws SerDeException {
+ String[][] schema = {
+ {"date_union", "uniontype<date,timestamp,interval_year_month,interval_day_time>"},
+ };
+ long NOW = System.currentTimeMillis();
+
+ StandardUnionObjectInspector.StandardUnion[][] dtiUnions = {
+ {union(0, new DateWritable(DateWritable.millisToDays(NOW)))},
+ {union(1, new TimestampWritable(new Timestamp(NOW)))},
+ {union(2, new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)))},
+ {union(3, new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 5_000_000)))},
+ };
+
+ initAndSerializeAndDeserialize(schema, dtiUnions);
+ }
+
+ public void testUnionBooleanBinary() throws SerDeException {
+ String[][] schema = {
+ {"boolean_union", "uniontype<boolean,binary>"},
+ };
+
+ StandardUnionObjectInspector.StandardUnion[][] booleanBinaryUnions = {
+ {union(0, new BooleanWritable(true))},
+ {union(1, new BytesWritable("Hello".getBytes()))},
+ };
+
+ initAndSerializeAndDeserialize(schema, booleanBinaryUnions);
+ }
+
+ private Object[][][] toStruct(Object[][] rows) {
+ Object[][][] struct = new Object[rows.length][][];
+ for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+ Object[] row = rows[rowIndex];
+ struct[rowIndex] = new Object[][] {row};
+ }
+ return struct;
+ }
+
+ @Test
+ public void testStructInteger() throws SerDeException {
+ String[][] schema = {
+ {"int_struct", "struct<tinyint1:tinyint,smallint1:smallint,int1:int,bigint1:bigint>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(INTEGER_ROWS));
+ }
+
+ @Test
+ public void testStructFloat() throws SerDeException {
+ String[][] schema = {
+ {"float_struct", "struct<float1:float,double1:double>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(FLOAT_ROWS));
+ }
+
+ @Test
+ public void testStructString() throws SerDeException {
+ String[][] schema = {
+ {"string_struct", "struct<string1:string,char1:char(10),varchar1:varchar(10)>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(STRING_ROWS));
+ }
+
+ @Test
+ public void testStructDTI() throws SerDeException {
+ String[][] schema = {
+ {"date_struct", "struct<date1:date,timestamp1:timestamp," +
+ "interval_year_month1:interval_year_month,interval_day_time1:interval_day_time>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(DTI_ROWS));
+ }
+
+ @Test
+ public void testStructBoolean() throws SerDeException {
+ String[][] schema = {
+ {"boolean_struct", "struct<boolean1:boolean>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(BOOLEAN_ROWS));
+ }
+
+ @Test
+ public void testStructBinary() throws SerDeException {
+ String[][] schema = {
+ {"binary_struct", "struct<binary1:binary>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(BINARY_ROWS));
+ }
+
+ private Object[][] toMap(Object[][] rows) {
+ Map[][] array = new Map[rows.length][];
+ for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
+ Object[] row = rows[rowIndex];
+ array[rowIndex] = new Map[row.length];
+ for (int fieldIndex = 0; fieldIndex < row.length; fieldIndex++) {
+ Map map = Maps.newHashMap();
+ map.put(new Text(String.valueOf(row[fieldIndex])), row[fieldIndex]);
+ array[rowIndex][fieldIndex] = map;
+ }
+ }
+ return array;
+ }
+
+ @Test
+ public void testMapInteger() throws SerDeException {
+ String[][] schema = {
+ {"tinyint_map", "map<string,tinyint>"},
+ {"smallint_map", "map<string,smallint>"},
+ {"int_map", "map<string,int>"},
+ {"bigint_map", "map<string,bigint>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(INTEGER_ROWS));
+ }
+
+ @Test
+ public void testMapFloat() throws SerDeException {
+ String[][] schema = {
+ {"float_map", "map<string,float>"},
+ {"double_map", "map<string,double>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(FLOAT_ROWS));
+ }
+
+ @Test
+ public void testMapString() throws SerDeException {
+ String[][] schema = {
+ {"string_map", "map<string,string>"},
+ {"char_map", "map<string,char(10)>"},
+ {"varchar_map", "map<string,varchar(10)>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(STRING_ROWS));
+ }
+
+ @Test
+ public void testMapDTI() throws SerDeException {
+ String[][] schema = {
+ {"date_map", "map<string,date>"},
+ {"timestamp_map", "map<string,timestamp>"},
+ {"interval_year_month_map", "map<string,interval_year_month>"},
+ {"interval_day_time_map", "map<string,interval_day_time>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(DTI_ROWS));
+ }
+
+ @Test
+ public void testMapBoolean() throws SerDeException {
+ String[][] schema = {
+ {"boolean_map", "map<string,boolean>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(BOOLEAN_ROWS));
+ }
+
+ @Test
+ public void testMapBinary() throws SerDeException {
+ String[][] schema = {
+ {"binary_map", "map<string,binary>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e090e58/serde/pom.xml
----------------------------------------------------------------------
diff --git a/serde/pom.xml b/serde/pom.xml
index e005585..eca34af 100644
--- a/serde/pom.xml
+++ b/serde/pom.xml
@@ -71,6 +71,11 @@
<version>${arrow.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
<version>${hppc.version}</version>
[2/6] hive git commit: HIVE-19495: Arrow SerDe itest failure (Teddy
Choi, reviewed by Matt McCline)
Posted by vg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/2726f302/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
new file mode 100644
index 0000000..bd23011
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.IntervalDayVector;
+import org.apache.arrow.vector.IntervalYearVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.NullableMapVector;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListVector;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
+import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
+
+class Serializer {
+ private final int MAX_BUFFERED_ROWS;
+
+ // Schema
+ private final StructTypeInfo structTypeInfo;
+ private final int fieldSize;
+
+ // Hive columns
+ private final VectorizedRowBatch vectorizedRowBatch;
+ private final VectorAssignRow vectorAssignRow;
+ private int batchSize;
+
+ private final NullableMapVector rootVector;
+
+ Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
+ MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
+ ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS);
+
+ // Schema
+ structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
+ List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ fieldSize = fieldTypeInfos.size();
+
+ // Init Arrow stuffs
+ rootVector = NullableMapVector.empty(null, serDe.rootAllocator);
+
+ // Init Hive stuffs
+ vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
+ for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+ final ColumnVector columnVector = createColumnVector(fieldTypeInfos.get(fieldIndex));
+ vectorizedRowBatch.cols[fieldIndex] = columnVector;
+ columnVector.init();
+ }
+ vectorizedRowBatch.ensureSize(MAX_BUFFERED_ROWS);
+ vectorAssignRow = new VectorAssignRow();
+ try {
+ vectorAssignRow.init(serDe.rowObjectInspector);
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+ }
+
+ private ArrowWrapperWritable serializeBatch() {
+ rootVector.setValueCount(0);
+
+ for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; fieldIndex++) {
+ final int projectedColumn = vectorizedRowBatch.projectedColumns[fieldIndex];
+ final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
+ final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
+ final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+ final FieldType fieldType = toFieldType(fieldTypeInfo);
+ final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class);
+ arrowVector.setInitialCapacity(batchSize);
+ arrowVector.allocateNew();
+ write(arrowVector, hiveVector, fieldTypeInfo, batchSize);
+ }
+ vectorizedRowBatch.reset();
+ rootVector.setValueCount(batchSize);
+
+ batchSize = 0;
+ VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
+ return new ArrowWrapperWritable(vectorSchemaRoot);
+ }
+
+ private FieldType toFieldType(TypeInfo typeInfo) {
+ return new FieldType(true, toArrowType(typeInfo), null);
+ }
+
+ private ArrowType toArrowType(TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+ case BOOLEAN:
+ return Types.MinorType.BIT.getType();
+ case BYTE:
+ return Types.MinorType.TINYINT.getType();
+ case SHORT:
+ return Types.MinorType.SMALLINT.getType();
+ case INT:
+ return Types.MinorType.INT.getType();
+ case LONG:
+ return Types.MinorType.BIGINT.getType();
+ case FLOAT:
+ return Types.MinorType.FLOAT4.getType();
+ case DOUBLE:
+ return Types.MinorType.FLOAT8.getType();
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ return Types.MinorType.VARCHAR.getType();
+ case DATE:
+ return Types.MinorType.DATEDAY.getType();
+ case TIMESTAMP:
+ return Types.MinorType.TIMESTAMPNANO.getType();
+ case BINARY:
+ return Types.MinorType.VARBINARY.getType();
+ case DECIMAL:
+ final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+ return new ArrowType.Decimal(decimalTypeInfo.precision(), decimalTypeInfo.scale());
+ case INTERVAL_YEAR_MONTH:
+ return Types.MinorType.INTERVALYEAR.getType();
+ case INTERVAL_DAY_TIME:
+ return Types.MinorType.INTERVALDAY.getType();
+ case VOID:
+ case TIMESTAMPLOCALTZ:
+ case UNKNOWN:
+ default:
+ throw new IllegalArgumentException();
+ }
+ case LIST:
+ return ArrowType.List.INSTANCE;
+ case STRUCT:
+ return ArrowType.Struct.INSTANCE;
+ case MAP:
+ return ArrowType.List.INSTANCE;
+ case UNION:
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ writePrimitive(arrowVector, hiveVector, typeInfo, size);
+ break;
+ case LIST:
+ writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size);
+ break;
+ case STRUCT:
+ writeStruct((MapVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size);
+ break;
+ case UNION:
+ writeUnion(arrowVector, hiveVector, typeInfo, size);
+ break;
+ case MAP:
+ writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo,
+ int size) {
+ final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
+ final ListColumnVector structListVector = toStructListVector(hiveVector);
+
+ write(arrowVector, structListVector, structListTypeInfo, size);
+
+ final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
+ for (int rowIndex = 0; rowIndex < size; rowIndex++) {
+ if (hiveVector.isNull[rowIndex]) {
+ BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
+ } else {
+ BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
+ }
+ }
+ }
+
+ private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
+ int size) {
+ final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+ final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
+ final ColumnVector[] hiveObjectVectors = hiveUnionVector.fields;
+
+ final int tag = hiveUnionVector.tags[0];
+ final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
+ final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
+
+ write(arrowVector, hiveObjectVector, objectTypeInfo, size);
+ }
+
+ private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector,
+ StructTypeInfo typeInfo, int size) {
+ final List<String> fieldNames = typeInfo.getAllStructFieldNames();
+ final List<TypeInfo> fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+ final ColumnVector[] hiveFieldVectors = hiveVector.fields;
+ final int fieldSize = fieldTypeInfos.size();
+
+ for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
+ final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+ final ColumnVector hiveFieldVector = hiveFieldVectors[fieldIndex];
+ final String fieldName = fieldNames.get(fieldIndex);
+ final FieldVector arrowFieldVector =
+ arrowVector.addOrGet(fieldName,
+ toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
+ arrowFieldVector.setInitialCapacity(size);
+ arrowFieldVector.allocateNew();
+ write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size);
+ }
+
+ final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
+ for (int rowIndex = 0; rowIndex < size; rowIndex++) {
+ if (hiveVector.isNull[rowIndex]) {
+ BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
+ } else {
+ BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
+ }
+ }
+ }
+
+ private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo,
+ int size) {
+ final int OFFSET_WIDTH = 4;
+ final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
+ final ColumnVector hiveElementVector = hiveVector.child;
+ final FieldVector arrowElementVector =
+ (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+ arrowElementVector.setInitialCapacity(hiveVector.childCount);
+ arrowElementVector.allocateNew();
+
+ write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount);
+
+ final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
+ int nextOffset = 0;
+
+ for (int rowIndex = 0; rowIndex < size; rowIndex++) {
+ if (hiveVector.isNull[rowIndex]) {
+ offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
+ } else {
+ offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
+ nextOffset += (int) hiveVector.lengths[rowIndex];
+ arrowVector.setNotNull(rowIndex);
+ }
+ }
+ offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset);
+ }
+
+ private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo,
+ int size) {
+ final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+ ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ {
+ final BitVector bitVector = (BitVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ bitVector.setNull(i);
+ } else {
+ bitVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case BYTE:
+ {
+ final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ tinyIntVector.setNull(i);
+ } else {
+ tinyIntVector.set(i, (byte) ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case SHORT:
+ {
+ final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ smallIntVector.setNull(i);
+ } else {
+ smallIntVector.set(i, (short) ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case INT:
+ {
+ final IntVector intVector = (IntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intVector.setNull(i);
+ } else {
+ intVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case LONG:
+ {
+ final BigIntVector bigIntVector = (BigIntVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ bigIntVector.setNull(i);
+ } else {
+ bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case FLOAT:
+ {
+ final Float4Vector float4Vector = (Float4Vector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ float4Vector.setNull(i);
+ } else {
+ float4Vector.set(i, (float) ((DoubleColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case DOUBLE:
+ {
+ final Float8Vector float8Vector = (Float8Vector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ float8Vector.setNull(i);
+ } else {
+ float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ {
+ final VarCharVector varCharVector = (VarCharVector) arrowVector;
+ final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ varCharVector.setNull(i);
+ } else {
+ varCharVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]);
+ }
+ }
+ }
+ break;
+ case DATE:
+ {
+ final DateDayVector dateDayVector = (DateDayVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ dateDayVector.setNull(i);
+ } else {
+ dateDayVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case TIMESTAMP:
+ {
+ final TimeStampNanoVector timeStampNanoVector = (TimeStampNanoVector) arrowVector;
+ final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ timeStampNanoVector.setNull(i);
+ } else {
+ // Time = second + sub-second
+ final long secondInMillis = timestampColumnVector.getTime(i);
+ final long secondInNanos = (secondInMillis - secondInMillis % 1000) * NS_PER_MS; // second
+ final long subSecondInNanos = timestampColumnVector.getNanos(i); // sub-second
+
+ if ((secondInMillis > 0 && secondInNanos < 0) || (secondInMillis < 0 && secondInNanos > 0)) {
+ // If the timestamp cannot be represented in long nanosecond, set it as a null value
+ timeStampNanoVector.setNull(i);
+ } else {
+ timeStampNanoVector.set(i, secondInNanos + subSecondInNanos);
+ }
+ }
+ }
+ }
+ break;
+ case BINARY:
+ {
+ final VarBinaryVector varBinaryVector = (VarBinaryVector) arrowVector;
+ final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ varBinaryVector.setNull(i);
+ } else {
+ varBinaryVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]);
+ }
+ }
+ }
+ break;
+ case DECIMAL:
+ {
+ final DecimalVector decimalVector = (DecimalVector) arrowVector;
+ final int scale = decimalVector.getScale();
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ decimalVector.setNull(i);
+ } else {
+ decimalVector.set(i,
+ ((DecimalColumnVector) hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale));
+ }
+ }
+ }
+ break;
+ case INTERVAL_YEAR_MONTH:
+ {
+ final IntervalYearVector intervalYearVector = (IntervalYearVector) arrowVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intervalYearVector.setNull(i);
+ } else {
+ intervalYearVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]);
+ }
+ }
+ }
+ break;
+ case INTERVAL_DAY_TIME:
+ {
+ final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector;
+ final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
+ (IntervalDayTimeColumnVector) hiveVector;
+ for (int i = 0; i < size; i++) {
+ if (hiveVector.isNull[i]) {
+ intervalDayVector.setNull(i);
+ } else {
+ final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i);
+ final long days = totalSeconds / SECOND_PER_DAY;
+ final long millis =
+ (totalSeconds - days * SECOND_PER_DAY) * MS_PER_SECOND +
+ intervalDayTimeColumnVector.getNanos(i) / NS_PER_MS;
+ intervalDayVector.set(i, (int) days, (int) millis);
+ }
+ }
+ }
+ break;
+ case VOID:
+ case UNKNOWN:
+ case TIMESTAMPLOCALTZ:
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) {
+ // if row is null, it means there are no more rows (closeOp()).
+ // another case can be that the buffer is full.
+ if (obj == null) {
+ return serializeBatch();
+ }
+ List<Object> standardObjects = new ArrayList<Object>();
+ ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
+ ((StructObjectInspector) objInspector), WRITABLE);
+
+ vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize);
+ batchSize++;
+ if (batchSize == MAX_BUFFERED_ROWS) {
+ return serializeBatch();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2726f302/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
index bcb7a88..74f6624 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -54,7 +53,6 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.junit.Before;
import org.junit.Test;
@@ -66,10 +64,11 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
public class TestArrowColumnarBatchSerDe {
private Configuration conf;
@@ -105,14 +104,39 @@ public class TestArrowColumnarBatchSerDe {
{null, null, null},
};
- private final static long NOW = System.currentTimeMillis();
+ private final static long TIME_IN_MS = TimeUnit.DAYS.toMillis(365 + 31 + 3);
+ private final static long NEGATIVE_TIME_IN_MS = TimeUnit.DAYS.toMillis(-9 * 365 + 31 + 3);
+ private final static Timestamp TIMESTAMP;
+ private final static Timestamp NEGATIVE_TIMESTAMP_WITHOUT_NANOS;
+ private final static Timestamp NEGATIVE_TIMESTAMP_WITH_NANOS;
+
+ static {
+ TIMESTAMP = new Timestamp(TIME_IN_MS);
+ TIMESTAMP.setNanos(123456789);
+ NEGATIVE_TIMESTAMP_WITHOUT_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS);
+ NEGATIVE_TIMESTAMP_WITH_NANOS = new Timestamp(NEGATIVE_TIME_IN_MS);
+ NEGATIVE_TIMESTAMP_WITH_NANOS.setNanos(123456789);
+ }
+
private final static Object[][] DTI_ROWS = {
{
- new DateWritable(DateWritable.millisToDays(NOW)),
- new TimestampWritable(new Timestamp(NOW)),
+ new DateWritable(DateWritable.millisToDays(TIME_IN_MS)),
+ new TimestampWritable(TIMESTAMP),
new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)),
new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 5_000_000))
},
+ {
+ new DateWritable(DateWritable.millisToDays(NEGATIVE_TIME_IN_MS)),
+ new TimestampWritable(NEGATIVE_TIMESTAMP_WITHOUT_NANOS),
+ null,
+ null
+ },
+ {
+ null,
+ new TimestampWritable(NEGATIVE_TIMESTAMP_WITH_NANOS),
+ null,
+ null
+ },
{null, null, null, null},
};
@@ -184,7 +208,7 @@ public class TestArrowColumnarBatchSerDe {
}
private void initAndSerializeAndDeserialize(String[][] schema, Object[][] rows) throws SerDeException {
- AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+ ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
StructObjectInspector rowOI = initSerDe(serDe, schema);
serializeAndDeserialize(serDe, rows, rowOI);
}
@@ -214,9 +238,9 @@ public class TestArrowColumnarBatchSerDe {
TypeInfoFactory.getStructTypeInfo(fieldNameList, typeInfoList));
}
- private void serializeAndDeserialize(AbstractSerDe serDe, Object[][] rows,
- StructObjectInspector rowOI) throws SerDeException {
- Writable serialized = null;
+ private void serializeAndDeserialize(ArrowColumnarBatchSerDe serDe, Object[][] rows,
+ StructObjectInspector rowOI) {
+ ArrowWrapperWritable serialized = null;
for (Object[] row : rows) {
serialized = serDe.serialize(row, rowOI);
}
@@ -224,6 +248,7 @@ public class TestArrowColumnarBatchSerDe {
if (serialized == null) {
serialized = serDe.serialize(null, rowOI);
}
+ String s = serialized.getVectorSchemaRoot().contentToTSVString();
final Object[][] deserializedRows = (Object[][]) serDe.deserialize(serialized);
for (int rowIndex = 0; rowIndex < Math.min(deserializedRows.length, rows.length); rowIndex++) {
@@ -254,21 +279,28 @@ public class TestArrowColumnarBatchSerDe {
case STRUCT:
final Object[] rowStruct = (Object[]) row[fieldIndex];
final List deserializedRowStruct = (List) deserializedRow[fieldIndex];
- assertArrayEquals(rowStruct, deserializedRowStruct.toArray());
+ if (rowStruct == null) {
+ assertNull(deserializedRowStruct);
+ } else {
+ assertArrayEquals(rowStruct, deserializedRowStruct.toArray());
+ }
break;
case LIST:
case UNION:
assertEquals(row[fieldIndex], deserializedRow[fieldIndex]);
break;
case MAP:
- Map rowMap = (Map) row[fieldIndex];
- Map deserializedRowMap = (Map) deserializedRow[fieldIndex];
- Set rowMapKeySet = rowMap.keySet();
- Set deserializedRowMapKeySet = deserializedRowMap.keySet();
- assertTrue(rowMapKeySet.containsAll(deserializedRowMapKeySet));
- assertTrue(deserializedRowMapKeySet.containsAll(rowMapKeySet));
- for (Object key : rowMapKeySet) {
- assertEquals(rowMap.get(key), deserializedRowMap.get(key));
+ final Map rowMap = (Map) row[fieldIndex];
+ final Map deserializedRowMap = (Map) deserializedRow[fieldIndex];
+ if (rowMap == null) {
+ assertNull(deserializedRowMap);
+ } else {
+ final Set rowMapKeySet = rowMap.keySet();
+ final Set deserializedRowMapKeySet = deserializedRowMap.keySet();
+ assertEquals(rowMapKeySet, deserializedRowMapKeySet);
+ for (Object key : rowMapKeySet) {
+ assertEquals(rowMap.get(key), deserializedRowMap.get(key));
+ }
}
break;
}
@@ -341,14 +373,18 @@ public class TestArrowColumnarBatchSerDe {
newArrayList(text("hello")),
input -> text(input.toString().toUpperCase())),
intW(0))), // c16:array<struct<m:map<string,string>,n:int>>
- new TimestampWritable(new Timestamp(NOW)), // c17:timestamp
+ new TimestampWritable(TIMESTAMP), // c17:timestamp
decimalW(HiveDecimal.create(0, 0)), // c18:decimal(16,7)
new BytesWritable("Hello".getBytes()), // c19:binary
new DateWritable(123), // c20:date
varcharW("x", 20), // c21:varchar(20)
charW("y", 15), // c22:char(15)
new BytesWritable("world!".getBytes()), // c23:binary
- },
+ }, {
+ null, null, null, null, null, null, null, null, null, null, // c1-c10
+ null, null, null, null, null, null, null, null, null, null, // c11-c20
+ null, null, null, // c21-c23
+ }
};
initAndSerializeAndDeserialize(schema, comprehensiveRows);
@@ -378,7 +414,7 @@ public class TestArrowColumnarBatchSerDe {
final int batchSize = 1000;
final Object[][] integerRows = new Object[batchSize][];
- final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+ final ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
StructObjectInspector rowOI = initSerDe(serDe, schema);
for (int j = 0; j < 10; j++) {
@@ -397,7 +433,7 @@ public class TestArrowColumnarBatchSerDe {
{"bigint1", "bigint"}
};
- final AbstractSerDe serDe = new ArrowColumnarBatchSerDe();
+ final ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
StructObjectInspector rowOI = initSerDe(serDe, schema);
final Random random = new Random();
@@ -572,106 +608,6 @@ public class TestArrowColumnarBatchSerDe {
initAndSerializeAndDeserialize(schema, toList(BINARY_ROWS));
}
- private StandardUnionObjectInspector.StandardUnion union(int tag, Object object) {
- return new StandardUnionObjectInspector.StandardUnion((byte) tag, object);
- }
-
- public void testUnionInteger() throws SerDeException {
- String[][] schema = {
- {"int_union", "uniontype<tinyint,smallint,int,bigint>"},
- };
-
- StandardUnionObjectInspector.StandardUnion[][] integerUnions = {
- {union(0, byteW(0))},
- {union(1, shortW(1))},
- {union(2, intW(2))},
- {union(3, longW(3))},
- };
-
- initAndSerializeAndDeserialize(schema, integerUnions);
- }
-
- public void testUnionFloat() throws SerDeException {
- String[][] schema = {
- {"float_union", "uniontype<float,double>"},
- };
-
- StandardUnionObjectInspector.StandardUnion[][] floatUnions = {
- {union(0, floatW(0f))},
- {union(1, doubleW(1d))},
- };
-
- initAndSerializeAndDeserialize(schema, floatUnions);
- }
-
- public void testUnionString() throws SerDeException {
- String[][] schema = {
- {"string_union", "uniontype<string,int>"},
- };
-
- StandardUnionObjectInspector.StandardUnion[][] stringUnions = {
- {union(0, text("Hello"))},
- {union(1, intW(1))},
- };
-
- initAndSerializeAndDeserialize(schema, stringUnions);
- }
-
- public void testUnionChar() throws SerDeException {
- String[][] schema = {
- {"char_union", "uniontype<char(10),int>"},
- };
-
- StandardUnionObjectInspector.StandardUnion[][] charUnions = {
- {union(0, charW("Hello", 10))},
- {union(1, intW(1))},
- };
-
- initAndSerializeAndDeserialize(schema, charUnions);
- }
-
- public void testUnionVarchar() throws SerDeException {
- String[][] schema = {
- {"varchar_union", "uniontype<varchar(10),int>"},
- };
-
- StandardUnionObjectInspector.StandardUnion[][] varcharUnions = {
- {union(0, varcharW("Hello", 10))},
- {union(1, intW(1))},
- };
-
- initAndSerializeAndDeserialize(schema, varcharUnions);
- }
-
- public void testUnionDTI() throws SerDeException {
- String[][] schema = {
- {"date_union", "uniontype<date,timestamp,interval_year_month,interval_day_time>"},
- };
- long NOW = System.currentTimeMillis();
-
- StandardUnionObjectInspector.StandardUnion[][] dtiUnions = {
- {union(0, new DateWritable(DateWritable.millisToDays(NOW)))},
- {union(1, new TimestampWritable(new Timestamp(NOW)))},
- {union(2, new HiveIntervalYearMonthWritable(new HiveIntervalYearMonth(1, 2)))},
- {union(3, new HiveIntervalDayTimeWritable(new HiveIntervalDayTime(1, 2, 3, 4, 5_000_000)))},
- };
-
- initAndSerializeAndDeserialize(schema, dtiUnions);
- }
-
- public void testUnionBooleanBinary() throws SerDeException {
- String[][] schema = {
- {"boolean_union", "uniontype<boolean,binary>"},
- };
-
- StandardUnionObjectInspector.StandardUnion[][] booleanBinaryUnions = {
- {union(0, new BooleanWritable(true))},
- {union(1, new BytesWritable("Hello".getBytes()))},
- };
-
- initAndSerializeAndDeserialize(schema, booleanBinaryUnions);
- }
-
private Object[][][] toStruct(Object[][] rows) {
Object[][][] struct = new Object[rows.length][][];
for (int rowIndex = 0; rowIndex < rows.length; rowIndex++) {
@@ -719,6 +655,15 @@ public class TestArrowColumnarBatchSerDe {
}
@Test
+ public void testStructDecimal() throws SerDeException {
+ String[][] schema = {
+ {"decimal_struct", "struct<decimal1:decimal(38,10)>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toStruct(DECIMAL_ROWS));
+ }
+
+ @Test
public void testStructBoolean() throws SerDeException {
String[][] schema = {
{"boolean_struct", "struct<boolean1:boolean>"},
@@ -812,4 +757,21 @@ public class TestArrowColumnarBatchSerDe {
initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS));
}
+
+ public void testMapDecimal() throws SerDeException {
+ String[][] schema = {
+ {"decimal_map", "map<string,decimal(38,10)>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toMap(DECIMAL_ROWS));
+ }
+
+ public void testListDecimal() throws SerDeException {
+ String[][] schema = {
+ {"decimal_list", "array<decimal(38,10)>"},
+ };
+
+ initAndSerializeAndDeserialize(schema, toList(DECIMAL_ROWS));
+ }
+
}
[3/6] hive git commit: HIVE-19495: Arrow SerDe itest failure (Teddy
Choi, reviewed by Matt McCline)
Posted by vg...@apache.org.
HIVE-19495: Arrow SerDe itest failure (Teddy Choi, reviewed by Matt McCline)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2726f302
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2726f302
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2726f302
Branch: refs/heads/branch-3
Commit: 2726f3028c8963b5a5ae2f8a3bd49f5ae03767a5
Parents: 0e090e5
Author: Teddy Choi <tc...@hortonworks.com>
Authored: Tue May 15 20:44:21 2018 -0500
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 29 13:58:01 2018 -0700
----------------------------------------------------------------------
.../ql/io/arrow/ArrowColumnarBatchSerDe.java | 990 +------------------
.../hadoop/hive/ql/io/arrow/Deserializer.java | 423 ++++++++
.../hadoop/hive/ql/io/arrow/Serializer.java | 537 ++++++++++
.../io/arrow/TestArrowColumnarBatchSerDe.java | 208 ++--
4 files changed, 1087 insertions(+), 1071 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2726f302/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
index 330fa58..b093ebb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
@@ -18,78 +18,26 @@
package org.apache.hadoop.hive.ql.io.arrow;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.netty.buffer.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
-import org.apache.arrow.vector.complex.impl.UnionReader;
-import org.apache.arrow.vector.complex.impl.UnionWriter;
-import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.complex.writer.BaseWriter;
-import org.apache.arrow.vector.complex.writer.BigIntWriter;
-import org.apache.arrow.vector.complex.writer.BitWriter;
-import org.apache.arrow.vector.complex.writer.DateDayWriter;
-import org.apache.arrow.vector.complex.writer.DecimalWriter;
-import org.apache.arrow.vector.complex.writer.FieldWriter;
-import org.apache.arrow.vector.complex.writer.Float4Writer;
-import org.apache.arrow.vector.complex.writer.Float8Writer;
-import org.apache.arrow.vector.complex.writer.IntWriter;
-import org.apache.arrow.vector.complex.writer.IntervalDayWriter;
-import org.apache.arrow.vector.complex.writer.IntervalYearWriter;
-import org.apache.arrow.vector.complex.writer.SmallIntWriter;
-import org.apache.arrow.vector.complex.writer.TimeStampMilliWriter;
-import org.apache.arrow.vector.complex.writer.TinyIntWriter;
-import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
-import org.apache.arrow.vector.complex.writer.VarCharWriter;
-import org.apache.arrow.vector.holders.NullableBigIntHolder;
-import org.apache.arrow.vector.holders.NullableBitHolder;
-import org.apache.arrow.vector.holders.NullableDateDayHolder;
-import org.apache.arrow.vector.holders.NullableFloat4Holder;
-import org.apache.arrow.vector.holders.NullableFloat8Holder;
-import org.apache.arrow.vector.holders.NullableIntHolder;
-import org.apache.arrow.vector.holders.NullableIntervalDayHolder;
-import org.apache.arrow.vector.holders.NullableIntervalYearHolder;
-import org.apache.arrow.vector.holders.NullableSmallIntHolder;
-import org.apache.arrow.vector.holders.NullableTimeStampMilliHolder;
-import org.apache.arrow.vector.holders.NullableTinyIntHolder;
-import org.apache.arrow.vector.holders.NullableVarBinaryHolder;
-import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.arrow.vector.types.TimeUnit;
-import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
-import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -107,20 +55,12 @@ import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
-import java.lang.reflect.Method;
-import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.function.IntConsumer;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
-import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
-import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo;
-import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
/**
* ArrowColumnarBatchSerDe converts Apache Hive rows to Apache Arrow columns. Its serialized
@@ -143,17 +83,16 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
public static final Logger LOG = LoggerFactory.getLogger(ArrowColumnarBatchSerDe.class.getName());
private static final String DEFAULT_ARROW_FIELD_NAME = "[DEFAULT]";
- private static final int MS_PER_SECOND = 1_000;
- private static final int MS_PER_MINUTE = MS_PER_SECOND * 60;
- private static final int MS_PER_HOUR = MS_PER_MINUTE * 60;
- private static final int MS_PER_DAY = MS_PER_HOUR * 24;
- private static final int NS_PER_MS = 1_000_000;
+ static final int MS_PER_SECOND = 1_000;
+ static final int NS_PER_SECOND = 1_000_000_000;
+ static final int NS_PER_MS = 1_000_000;
+ static final int SECOND_PER_DAY = 24 * 60 * 60;
- private BufferAllocator rootAllocator;
+ BufferAllocator rootAllocator;
+ StructTypeInfo rowTypeInfo;
+ StructObjectInspector rowObjectInspector;
+ Configuration conf;
- private StructTypeInfo rowTypeInfo;
- private StructObjectInspector rowObjectInspector;
- private Configuration conf;
private Serializer serializer;
private Deserializer deserializer;
@@ -191,859 +130,8 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
fields.add(toField(columnNames.get(i), columnTypes.get(i)));
}
- serializer = new Serializer(new Schema(fields));
- deserializer = new Deserializer();
- }
-
- private class Serializer {
- private final int MAX_BUFFERED_ROWS;
-
- // Schema
- private final StructTypeInfo structTypeInfo;
- private final List<TypeInfo> fieldTypeInfos;
- private final int fieldSize;
-
- // Hive columns
- private final VectorizedRowBatch vectorizedRowBatch;
- private final VectorAssignRow vectorAssignRow;
- private int batchSize;
-
- // Arrow columns
- private final VectorSchemaRoot vectorSchemaRoot;
- private final List<FieldVector> arrowVectors;
- private final List<FieldWriter> fieldWriters;
-
- private Serializer(Schema schema) throws SerDeException {
- MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HIVE_ARROW_BATCH_SIZE);
- LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS);
-
- // Schema
- structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(rowObjectInspector);
- fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
- fieldSize = fieldTypeInfos.size();
-
- // Init Arrow stuffs
- vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator);
- arrowVectors = vectorSchemaRoot.getFieldVectors();
- fieldWriters = Lists.newArrayList();
- for (FieldVector fieldVector : arrowVectors) {
- final FieldWriter fieldWriter =
- Types.getMinorTypeForArrowType(
- fieldVector.getField().getType()).getNewFieldWriter(fieldVector);
- fieldWriters.add(fieldWriter);
- }
-
- // Init Hive stuffs
- vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
- for (int i = 0; i < fieldSize; i++) {
- final ColumnVector columnVector = createColumnVector(fieldTypeInfos.get(i));
- vectorizedRowBatch.cols[i] = columnVector;
- columnVector.init();
- }
- vectorizedRowBatch.ensureSize(MAX_BUFFERED_ROWS);
- vectorAssignRow = new VectorAssignRow();
- try {
- vectorAssignRow.init(rowObjectInspector);
- } catch (HiveException e) {
- throw new SerDeException(e);
- }
- }
-
- private ArrowWrapperWritable serializeBatch() {
- for (int i = 0; i < vectorizedRowBatch.projectionSize; i++) {
- final int projectedColumn = vectorizedRowBatch.projectedColumns[i];
- final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
- final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(i);
- final FieldWriter fieldWriter = fieldWriters.get(i);
- final FieldVector arrowVector = arrowVectors.get(i);
- arrowVector.setValueCount(0);
- fieldWriter.setPosition(0);
- write(fieldWriter, arrowVector, hiveVector, fieldTypeInfo, 0, batchSize, true);
- }
- vectorizedRowBatch.reset();
- vectorSchemaRoot.setRowCount(batchSize);
-
- batchSize = 0;
- return new ArrowWrapperWritable(vectorSchemaRoot);
- }
-
- private BaseWriter getWriter(FieldWriter writer, TypeInfo typeInfo, String name) {
- switch (typeInfo.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
- case BOOLEAN:
- return writer.bit(name);
- case BYTE:
- return writer.tinyInt(name);
- case SHORT:
- return writer.smallInt(name);
- case INT:
- return writer.integer(name);
- case LONG:
- return writer.bigInt(name);
- case FLOAT:
- return writer.float4(name);
- case DOUBLE:
- return writer.float8(name);
- case STRING:
- case VARCHAR:
- case CHAR:
- return writer.varChar(name);
- case DATE:
- return writer.dateDay(name);
- case TIMESTAMP:
- return writer.timeStampMilli(name);
- case BINARY:
- return writer.varBinary(name);
- case DECIMAL:
- final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
- final int scale = decimalTypeInfo.scale();
- final int precision = decimalTypeInfo.precision();
- return writer.decimal(name, scale, precision);
- case INTERVAL_YEAR_MONTH:
- return writer.intervalYear(name);
- case INTERVAL_DAY_TIME:
- return writer.intervalDay(name);
- case TIMESTAMPLOCALTZ: // VectorAssignRow doesn't support it
- case VOID:
- case UNKNOWN:
- default:
- throw new IllegalArgumentException();
- }
- case LIST:
- case UNION:
- return writer.list(name);
- case STRUCT:
- return writer.map(name);
- case MAP: // The caller will convert map to array<struct>
- return writer.list(name).map();
- default:
- throw new IllegalArgumentException();
- }
- }
-
- private BaseWriter getWriter(FieldWriter writer, TypeInfo typeInfo) {
- switch (typeInfo.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
- case BOOLEAN:
- return writer.bit();
- case BYTE:
- return writer.tinyInt();
- case SHORT:
- return writer.smallInt();
- case INT:
- return writer.integer();
- case LONG:
- return writer.bigInt();
- case FLOAT:
- return writer.float4();
- case DOUBLE:
- return writer.float8();
- case STRING:
- case VARCHAR:
- case CHAR:
- return writer.varChar();
- case DATE:
- return writer.dateDay();
- case TIMESTAMP:
- return writer.timeStampMilli();
- case BINARY:
- return writer.varBinary();
- case INTERVAL_YEAR_MONTH:
- return writer.intervalDay();
- case INTERVAL_DAY_TIME:
- return writer.intervalYear();
- case TIMESTAMPLOCALTZ: // VectorAssignRow doesn't support it
- case DECIMAL: // ListVector doesn't support it
- case VOID:
- case UNKNOWN:
- default:
- throw new IllegalArgumentException();
- }
- case LIST:
- case UNION:
- return writer.list();
- case STRUCT:
- return writer.map();
- case MAP: // The caller will convert map to array<struct>
- return writer.list().map();
- default:
- throw new IllegalArgumentException();
- }
- }
-
- private void write(BaseWriter baseWriter, FieldVector arrowVector, ColumnVector hiveVector,
- TypeInfo typeInfo, int offset, int length, boolean incrementIndex) {
-
- final IntConsumer writer;
- switch (typeInfo.getCategory()) {
- case PRIMITIVE:
- final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
- ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
- switch (primitiveCategory) {
- case BOOLEAN:
- writer = index -> ((BitWriter) baseWriter).writeBit(
- (int) ((LongColumnVector) hiveVector).vector[index]);
- break;
- case BYTE:
- writer = index ->
- ((TinyIntWriter) baseWriter).writeTinyInt(
- (byte) ((LongColumnVector) hiveVector).vector[index]);
- break;
- case SHORT:
- writer = index -> ((SmallIntWriter) baseWriter).writeSmallInt(
- (short) ((LongColumnVector) hiveVector).vector[index]);
- break;
- case INT:
- writer = index -> ((IntWriter) baseWriter).writeInt(
- (int) ((LongColumnVector) hiveVector).vector[index]);
- break;
- case LONG:
- writer = index -> ((BigIntWriter) baseWriter).writeBigInt(
- ((LongColumnVector) hiveVector).vector[index]);
- break;
- case FLOAT:
- writer = index -> ((Float4Writer) baseWriter).writeFloat4(
- (float) ((DoubleColumnVector) hiveVector).vector[index]);
- break;
- case DOUBLE:
- writer = index -> ((Float8Writer) baseWriter).writeFloat8(
- ((DoubleColumnVector) hiveVector).vector[index]);
- break;
- case STRING:
- case VARCHAR:
- case CHAR:
- writer = index -> {
- BytesColumnVector stringVector = (BytesColumnVector) hiveVector;
- byte[] bytes = stringVector.vector[index];
- int start = stringVector.start[index];
- int bytesLength = stringVector.length[index];
- try (ArrowBuf arrowBuf = rootAllocator.buffer(bytesLength)) {
- arrowBuf.setBytes(0, bytes, start, bytesLength);
- ((VarCharWriter) baseWriter).writeVarChar(0, bytesLength, arrowBuf);
- }
- };
- break;
- case DATE:
- writer = index -> ((DateDayWriter) baseWriter).writeDateDay(
- (int) ((LongColumnVector) hiveVector).vector[index]);
- break;
- case TIMESTAMP:
- writer = index -> ((TimeStampMilliWriter) baseWriter).writeTimeStampMilli(
- ((TimestampColumnVector) hiveVector).getTime(index));
- break;
- case BINARY:
- writer = index -> {
- BytesColumnVector binaryVector = (BytesColumnVector) hiveVector;
- final byte[] bytes = binaryVector.vector[index];
- final int start = binaryVector.start[index];
- final int byteLength = binaryVector.length[index];
- try (ArrowBuf arrowBuf = rootAllocator.buffer(byteLength)) {
- arrowBuf.setBytes(0, bytes, start, byteLength);
- ((VarBinaryWriter) baseWriter).writeVarBinary(0, byteLength, arrowBuf);
- }
- };
- break;
- case DECIMAL:
- writer = index -> {
- DecimalColumnVector hiveDecimalVector = (DecimalColumnVector) hiveVector;
- ((DecimalWriter) baseWriter).writeDecimal(
- hiveDecimalVector.vector[index].getHiveDecimal().bigDecimalValue()
- .setScale(hiveDecimalVector.scale));
- };
- break;
- case INTERVAL_YEAR_MONTH:
- writer = index -> ((IntervalYearWriter) baseWriter).writeIntervalYear(
- (int) ((LongColumnVector) hiveVector).vector[index]);
- break;
- case INTERVAL_DAY_TIME:
- writer = index -> {
- IntervalDayTimeColumnVector intervalDayTimeVector =
- (IntervalDayTimeColumnVector) hiveVector;
- final long millis = (intervalDayTimeVector.getTotalSeconds(index) * 1_000) +
- (intervalDayTimeVector.getNanos(index) / 1_000_000);
- final int days = (int) (millis / MS_PER_DAY);
- ((IntervalDayWriter) baseWriter).writeIntervalDay(
- days, (int) (millis % MS_PER_DAY));
- };
- break;
- case VOID:
- case UNKNOWN:
- case TIMESTAMPLOCALTZ:
- default:
- throw new IllegalArgumentException();
- }
- break;
- case LIST:
- final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
- final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
- final ListColumnVector hiveListVector = (ListColumnVector) hiveVector;
- final ColumnVector hiveElementVector = hiveListVector.child;
- final FieldVector arrowElementVector = arrowVector.getChildrenFromFields().get(0);
- final BaseWriter.ListWriter listWriter = (BaseWriter.ListWriter) baseWriter;
- final BaseWriter elementWriter = getWriter((FieldWriter) baseWriter, elementTypeInfo);
-
- writer = index -> {
- final int listOffset = (int) hiveListVector.offsets[index];
- final int listLength = (int) hiveListVector.lengths[index];
- listWriter.startList();
- write(elementWriter, arrowElementVector, hiveElementVector, elementTypeInfo,
- listOffset, listLength, false);
- listWriter.endList();
- };
-
- incrementIndex = false;
- break;
- case STRUCT:
- final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
- final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
- final StructColumnVector hiveStructVector = (StructColumnVector) hiveVector;
- final List<FieldVector> arrowFieldVectors = arrowVector.getChildrenFromFields();
- final ColumnVector[] hiveFieldVectors = hiveStructVector.fields;
- final BaseWriter.MapWriter structWriter = (BaseWriter.MapWriter) baseWriter;
- final int fieldSize = fieldTypeInfos.size();
-
- writer = index -> {
- structWriter.start();
- for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
- final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
- final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex);
- final ColumnVector hiveFieldVector = hiveFieldVectors[fieldIndex];
- final BaseWriter fieldWriter = getWriter((FieldWriter) structWriter, fieldTypeInfo,
- fieldName);
- final FieldVector arrowFieldVector = arrowFieldVectors.get(fieldIndex);
- write(fieldWriter, arrowFieldVector, hiveFieldVector, fieldTypeInfo, index, 1, false);
- }
- structWriter.end();
- };
-
- incrementIndex = false;
- break;
- case UNION:
- final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
- final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
- final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
- final ColumnVector[] hiveObjectVectors = hiveUnionVector.fields;
- final UnionWriter unionWriter = (UnionWriter) baseWriter;
-
- writer = index -> {
- final int tag = hiveUnionVector.tags[index];
- final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
- final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
- write(unionWriter, arrowVector, hiveObjectVector, objectTypeInfo, index, 1, false);
- };
- break;
- case MAP:
- final ListTypeInfo structListTypeInfo =
- toStructListTypeInfo((MapTypeInfo) typeInfo);
- final ListColumnVector structListVector =
- toStructListVector((MapColumnVector) hiveVector);
-
- writer = index -> write(baseWriter, arrowVector, structListVector, structListTypeInfo,
- index, length, false);
-
- incrementIndex = false;
- break;
- default:
- throw new IllegalArgumentException();
- }
-
- if (hiveVector.noNulls) {
- if (hiveVector.isRepeating) {
- for (int i = 0; i < length; i++) {
- writer.accept(0);
- if (incrementIndex) {
- baseWriter.setPosition(baseWriter.getPosition() + 1);
- }
- }
- } else {
- if (vectorizedRowBatch.selectedInUse) {
- for (int j = 0; j < length; j++) {
- final int i = vectorizedRowBatch.selected[j];
- writer.accept(offset + i);
- if (incrementIndex) {
- baseWriter.setPosition(baseWriter.getPosition() + 1);
- }
- }
- } else {
- for (int i = 0; i < length; i++) {
- writer.accept(offset + i);
- if (incrementIndex) {
- baseWriter.setPosition(baseWriter.getPosition() + 1);
- }
- }
- }
- }
- } else {
- if (hiveVector.isRepeating) {
- for (int i = 0; i < length; i++) {
- if (hiveVector.isNull[0]) {
- writeNull(baseWriter);
- } else {
- writer.accept(0);
- }
- if (incrementIndex) {
- baseWriter.setPosition(baseWriter.getPosition() + 1);
- }
- }
- } else {
- if (vectorizedRowBatch.selectedInUse) {
- for (int j = 0; j < length; j++) {
- final int i = vectorizedRowBatch.selected[j];
- if (hiveVector.isNull[offset + i]) {
- writeNull(baseWriter);
- } else {
- writer.accept(offset + i);
- }
- if (incrementIndex) {
- baseWriter.setPosition(baseWriter.getPosition() + 1);
- }
- }
- } else {
- for (int i = 0; i < length; i++) {
- if (hiveVector.isNull[offset + i]) {
- writeNull(baseWriter);
- } else {
- writer.accept(offset + i);
- }
- if (incrementIndex) {
- baseWriter.setPosition(baseWriter.getPosition() + 1);
- }
- }
- }
- }
- }
- }
-
- public ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) {
- // if row is null, it means there are no more rows (closeOp()).
- // another case can be that the buffer is full.
- if (obj == null) {
- return serializeBatch();
- }
- List<Object> standardObjects = new ArrayList<Object>();
- ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
- ((StructObjectInspector) objInspector), WRITABLE);
-
- vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize);
- batchSize++;
- if (batchSize == MAX_BUFFERED_ROWS) {
- return serializeBatch();
- }
- return null;
- }
- }
-
- private static void writeNull(BaseWriter baseWriter) {
- if (baseWriter instanceof UnionListWriter) {
- // UnionListWriter should implement AbstractFieldWriter#writeNull
- BaseWriter.ListWriter listWriter = ((UnionListWriter) baseWriter).list();
- listWriter.setPosition(listWriter.getPosition() + 1);
- } else {
- // FieldWriter should have a super method of AbstractFieldWriter#writeNull
- try {
- Method method = baseWriter.getClass().getMethod("writeNull");
- method.setAccessible(true);
- method.invoke(baseWriter);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static abstract class PrimitiveReader {
- final void read(FieldReader reader, ColumnVector columnVector, int offset, int length) {
- for (int i = 0; i < length; i++) {
- final int rowIndex = offset + i;
- if (reader.isSet()) {
- doRead(reader, columnVector, rowIndex);
- } else {
- VectorizedBatchUtil.setNullColIsNullValue(columnVector, rowIndex);
- }
- reader.setPosition(reader.getPosition() + 1);
- }
- }
-
- abstract void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex);
- }
-
- private class Deserializer {
- private final VectorExtractRow vectorExtractRow;
- private final VectorizedRowBatch vectorizedRowBatch;
- private Object[][] rows;
-
- public Deserializer() throws SerDeException {
- vectorExtractRow = new VectorExtractRow();
- final List<TypeInfo> fieldTypeInfoList = rowTypeInfo.getAllStructFieldTypeInfos();
- final int fieldCount = fieldTypeInfoList.size();
- final TypeInfo[] typeInfos = fieldTypeInfoList.toArray(new TypeInfo[fieldCount]);
- try {
- vectorExtractRow.init(typeInfos);
- } catch (HiveException e) {
- throw new SerDeException(e);
- }
-
- vectorizedRowBatch = new VectorizedRowBatch(fieldCount);
- for (int i = 0; i < fieldCount; i++) {
- final ColumnVector columnVector = createColumnVector(typeInfos[i]);
- columnVector.init();
- vectorizedRowBatch.cols[i] = columnVector;
- }
- }
-
- public Object deserialize(Writable writable) {
- final ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) writable;
- final VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot();
- final List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
- final int fieldCount = fieldVectors.size();
- final int rowCount = vectorSchemaRoot.getRowCount();
- vectorizedRowBatch.ensureSize(rowCount);
-
- if (rows == null || rows.length < rowCount ) {
- rows = new Object[rowCount][];
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- rows[rowIndex] = new Object[fieldCount];
- }
- }
-
- for (int i = 0; i < fieldCount; i++) {
- final FieldVector fieldVector = fieldVectors.get(i);
- final FieldReader fieldReader = fieldVector.getReader();
- fieldReader.setPosition(0);
- final int projectedCol = vectorizedRowBatch.projectedColumns[i];
- final ColumnVector columnVector = vectorizedRowBatch.cols[projectedCol];
- final TypeInfo typeInfo = rowTypeInfo.getAllStructFieldTypeInfos().get(i);
- read(fieldReader, columnVector, typeInfo, 0, rowCount);
- }
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- vectorExtractRow.extractRow(vectorizedRowBatch, rowIndex, rows[rowIndex]);
- }
- vectorizedRowBatch.reset();
- return rows;
- }
-
- private void read(FieldReader reader, ColumnVector columnVector, TypeInfo typeInfo,
- int rowOffset, int rowLength) {
- switch (typeInfo.getCategory()) {
- case PRIMITIVE:
- final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
- ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
- final PrimitiveReader primitiveReader;
- switch (primitiveCategory) {
- case BOOLEAN:
- primitiveReader = new PrimitiveReader() {
- NullableBitHolder holder = new NullableBitHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case BYTE:
- primitiveReader = new PrimitiveReader() {
- NullableTinyIntHolder holder = new NullableTinyIntHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case SHORT:
- primitiveReader = new PrimitiveReader() {
- NullableSmallIntHolder holder = new NullableSmallIntHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case INT:
- primitiveReader = new PrimitiveReader() {
- NullableIntHolder holder = new NullableIntHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case LONG:
- primitiveReader = new PrimitiveReader() {
- NullableBigIntHolder holder = new NullableBigIntHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case FLOAT:
- primitiveReader = new PrimitiveReader() {
- NullableFloat4Holder holder = new NullableFloat4Holder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((DoubleColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case DOUBLE:
- primitiveReader = new PrimitiveReader() {
- NullableFloat8Holder holder = new NullableFloat8Holder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((DoubleColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case STRING:
- case VARCHAR:
- case CHAR:
- primitiveReader = new PrimitiveReader() {
- NullableVarCharHolder holder = new NullableVarCharHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- int varCharSize = holder.end - holder.start;
- byte[] varCharBytes = new byte[varCharSize];
- holder.buffer.getBytes(holder.start, varCharBytes);
- ((BytesColumnVector) columnVector).setVal(rowIndex, varCharBytes, 0, varCharSize);
- }
- };
- break;
- case DATE:
- primitiveReader = new PrimitiveReader() {
- NullableDateDayHolder holder = new NullableDateDayHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case TIMESTAMP:
- primitiveReader = new PrimitiveReader() {
- NullableTimeStampMilliHolder timeStampMilliHolder =
- new NullableTimeStampMilliHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(timeStampMilliHolder);
- ((TimestampColumnVector) columnVector).set(rowIndex,
- new Timestamp(timeStampMilliHolder.value));
- }
- };
- break;
- case BINARY:
- primitiveReader = new PrimitiveReader() {
- NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- final int binarySize = holder.end - holder.start;
- final byte[] binaryBytes = new byte[binarySize];
- holder.buffer.getBytes(holder.start, binaryBytes);
- ((BytesColumnVector) columnVector).setVal(rowIndex, binaryBytes, 0, binarySize);
- }
- };
- break;
- case DECIMAL:
- primitiveReader = new PrimitiveReader() {
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- ((DecimalColumnVector) columnVector).set(rowIndex,
- HiveDecimal.create(reader.readBigDecimal()));
- }
- };
- break;
- case INTERVAL_YEAR_MONTH:
- primitiveReader = new PrimitiveReader() {
- NullableIntervalYearHolder holder = new NullableIntervalYearHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- reader.read(holder);
- ((LongColumnVector) columnVector).vector[rowIndex] = holder.value;
- }
- };
- break;
- case INTERVAL_DAY_TIME:
- primitiveReader = new PrimitiveReader() {
- NullableIntervalDayHolder holder = new NullableIntervalDayHolder();
-
- @Override
- void doRead(FieldReader reader, ColumnVector columnVector, int rowIndex) {
- IntervalDayTimeColumnVector intervalDayTimeVector =
- (IntervalDayTimeColumnVector) columnVector;
- reader.read(holder);
- HiveIntervalDayTime intervalDayTime = new HiveIntervalDayTime(
- holder.days, // days
- holder.milliseconds / MS_PER_HOUR, // hour
- (holder.milliseconds % MS_PER_HOUR) / MS_PER_MINUTE, // minute
- (holder.milliseconds % MS_PER_MINUTE) / MS_PER_SECOND, // second
- (holder.milliseconds % MS_PER_SECOND) * NS_PER_MS); // nanosecond
- intervalDayTimeVector.set(rowIndex, intervalDayTime);
- }
- };
- break;
- default:
- throw new IllegalArgumentException();
- }
- primitiveReader.read(reader, columnVector, rowOffset, rowLength);
- break;
- case LIST:
- final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
- final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
- final ListColumnVector listVector = (ListColumnVector) columnVector;
- final ColumnVector elementVector = listVector.child;
- final FieldReader elementReader = reader.reader();
-
- int listOffset = 0;
- for (int rowIndex = 0; rowIndex < rowLength; rowIndex++) {
- final int adjustedRowIndex = rowOffset + rowIndex;
- reader.setPosition(adjustedRowIndex);
- final int listLength = reader.size();
- listVector.offsets[adjustedRowIndex] = listOffset;
- listVector.lengths[adjustedRowIndex] = listLength;
- read(elementReader, elementVector, elementTypeInfo, listOffset, listLength);
- listOffset += listLength;
- }
- break;
- case STRUCT:
- final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
- final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
- final List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
- final int fieldSize = fieldNames.size();
- final StructColumnVector structVector = (StructColumnVector) columnVector;
- final ColumnVector[] fieldVectors = structVector.fields;
-
- for (int fieldIndex = 0; fieldIndex < fieldSize; fieldIndex++) {
- final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
- final FieldReader fieldReader = reader.reader(fieldNames.get(fieldIndex));
- final ColumnVector fieldVector = fieldVectors[fieldIndex];
- read(fieldReader, fieldVector, fieldTypeInfo, rowOffset, rowLength);
- }
- break;
- case UNION:
- final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
- final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
- final UnionColumnVector unionVector = (UnionColumnVector) columnVector;
- final ColumnVector[] objectVectors = unionVector.fields;
- final Map<Types.MinorType, Integer> minorTypeToTagMap = Maps.newHashMap();
- for (int tag = 0; tag < objectTypeInfos.size(); tag++) {
- minorTypeToTagMap.put(toMinorType(objectTypeInfos.get(tag)), tag);
- }
-
- final UnionReader unionReader = (UnionReader) reader;
- for (int rowIndex = 0; rowIndex < rowLength; rowIndex++) {
- final int adjustedRowIndex = rowIndex + rowOffset;
- unionReader.setPosition(adjustedRowIndex);
- final Types.MinorType minorType = unionReader.getMinorType();
- final int tag = minorTypeToTagMap.get(minorType);
- unionVector.tags[adjustedRowIndex] = tag;
- read(unionReader, objectVectors[tag], objectTypeInfos.get(tag), adjustedRowIndex, 1);
- }
- break;
- case MAP:
- final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
- final ListTypeInfo mapStructListTypeInfo = toStructListTypeInfo(mapTypeInfo);
- final MapColumnVector hiveMapVector = (MapColumnVector) columnVector;
- final ListColumnVector mapStructListVector = toStructListVector(hiveMapVector);
- final StructColumnVector mapStructVector = (StructColumnVector) mapStructListVector.child;
- read(reader, mapStructListVector, mapStructListTypeInfo, rowOffset, rowLength);
-
- hiveMapVector.isRepeating = mapStructListVector.isRepeating;
- hiveMapVector.childCount = mapStructListVector.childCount;
- hiveMapVector.noNulls = mapStructListVector.noNulls;
- System.arraycopy(mapStructListVector.offsets, 0, hiveMapVector.offsets, 0, rowLength);
- System.arraycopy(mapStructListVector.lengths, 0, hiveMapVector.lengths, 0, rowLength);
- hiveMapVector.keys = mapStructVector.fields[0];
- hiveMapVector.values = mapStructVector.fields[1];
- break;
- default:
- throw new IllegalArgumentException();
- }
- }
- }
-
- private static Types.MinorType toMinorType(TypeInfo typeInfo) {
- switch (typeInfo.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
- case BOOLEAN:
- return Types.MinorType.BIT;
- case BYTE:
- return Types.MinorType.TINYINT;
- case SHORT:
- return Types.MinorType.SMALLINT;
- case INT:
- return Types.MinorType.INT;
- case LONG:
- return Types.MinorType.BIGINT;
- case FLOAT:
- return Types.MinorType.FLOAT4;
- case DOUBLE:
- return Types.MinorType.FLOAT8;
- case STRING:
- case VARCHAR:
- case CHAR:
- return Types.MinorType.VARCHAR;
- case DATE:
- return Types.MinorType.DATEDAY;
- case TIMESTAMP:
- return Types.MinorType.TIMESTAMPMILLI;
- case BINARY:
- return Types.MinorType.VARBINARY;
- case DECIMAL:
- return Types.MinorType.DECIMAL;
- case INTERVAL_YEAR_MONTH:
- return Types.MinorType.INTERVALYEAR;
- case INTERVAL_DAY_TIME:
- return Types.MinorType.INTERVALDAY;
- case VOID:
- case TIMESTAMPLOCALTZ:
- case UNKNOWN:
- default:
- throw new IllegalArgumentException();
- }
- case LIST:
- return Types.MinorType.LIST;
- case STRUCT:
- return Types.MinorType.MAP;
- case UNION:
- return Types.MinorType.UNION;
- case MAP:
- // Apache Arrow doesn't have a map vector, so it's converted to a list vector of a struct
- // vector.
- return Types.MinorType.LIST;
- default:
- throw new IllegalArgumentException();
- }
- }
-
- private static ListTypeInfo toStructListTypeInfo(MapTypeInfo mapTypeInfo) {
- final StructTypeInfo structTypeInfo = new StructTypeInfo();
- structTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", "values"));
- structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(
- mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo()));
- final ListTypeInfo structListTypeInfo = new ListTypeInfo();
- structListTypeInfo.setListElementTypeInfo(structTypeInfo);
- return structListTypeInfo;
+ serializer = new Serializer(this);
+ deserializer = new Deserializer(this);
}
private static Field toField(String name, TypeInfo typeInfo) {
@@ -1052,52 +140,50 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BOOLEAN:
- return Field.nullable(name, Types.MinorType.BIT.getType());
+ return Field.nullable(name, MinorType.BIT.getType());
case BYTE:
- return Field.nullable(name, Types.MinorType.TINYINT.getType());
+ return Field.nullable(name, MinorType.TINYINT.getType());
case SHORT:
- return Field.nullable(name, Types.MinorType.SMALLINT.getType());
+ return Field.nullable(name, MinorType.SMALLINT.getType());
case INT:
- return Field.nullable(name, Types.MinorType.INT.getType());
+ return Field.nullable(name, MinorType.INT.getType());
case LONG:
- return Field.nullable(name, Types.MinorType.BIGINT.getType());
+ return Field.nullable(name, MinorType.BIGINT.getType());
case FLOAT:
- return Field.nullable(name, Types.MinorType.FLOAT4.getType());
+ return Field.nullable(name, MinorType.FLOAT4.getType());
case DOUBLE:
- return Field.nullable(name, Types.MinorType.FLOAT8.getType());
+ return Field.nullable(name, MinorType.FLOAT8.getType());
case STRING:
- return Field.nullable(name, Types.MinorType.VARCHAR.getType());
+ case VARCHAR:
+ case CHAR:
+ return Field.nullable(name, MinorType.VARCHAR.getType());
case DATE:
- return Field.nullable(name, Types.MinorType.DATEDAY.getType());
+ return Field.nullable(name, MinorType.DATEDAY.getType());
case TIMESTAMP:
- return Field.nullable(name, Types.MinorType.TIMESTAMPMILLI.getType());
+ return Field.nullable(name, MinorType.TIMESTAMPMILLI.getType());
case TIMESTAMPLOCALTZ:
final TimestampLocalTZTypeInfo timestampLocalTZTypeInfo =
(TimestampLocalTZTypeInfo) typeInfo;
final String timeZone = timestampLocalTZTypeInfo.getTimeZone().toString();
return Field.nullable(name, new ArrowType.Timestamp(TimeUnit.MILLISECOND, timeZone));
case BINARY:
- return Field.nullable(name, Types.MinorType.VARBINARY.getType());
+ return Field.nullable(name, MinorType.VARBINARY.getType());
case DECIMAL:
final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
final int precision = decimalTypeInfo.precision();
final int scale = decimalTypeInfo.scale();
return Field.nullable(name, new ArrowType.Decimal(precision, scale));
- case VARCHAR:
- return Field.nullable(name, Types.MinorType.VARCHAR.getType());
- case CHAR:
- return Field.nullable(name, Types.MinorType.VARCHAR.getType());
case INTERVAL_YEAR_MONTH:
- return Field.nullable(name, Types.MinorType.INTERVALYEAR.getType());
+ return Field.nullable(name, MinorType.INTERVALYEAR.getType());
case INTERVAL_DAY_TIME:
- return Field.nullable(name, Types.MinorType.INTERVALDAY.getType());
+ return Field.nullable(name, MinorType.INTERVALDAY.getType());
default:
throw new IllegalArgumentException();
}
case LIST:
final ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
- return new Field(name, FieldType.nullable(Types.MinorType.LIST.getType()),
+ return new Field(name, FieldType.nullable(MinorType.LIST.getType()),
Lists.newArrayList(toField(DEFAULT_ARROW_FIELD_NAME, elementTypeInfo)));
case STRUCT:
final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
@@ -1108,7 +194,7 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
for (int i = 0; i < structSize; i++) {
structFields.add(toField(fieldNames.get(i), fieldTypeInfos.get(i)));
}
- return new Field(name, FieldType.nullable(Types.MinorType.MAP.getType()), structFields);
+ return new Field(name, FieldType.nullable(MinorType.MAP.getType()), structFields);
case UNION:
final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
@@ -1117,17 +203,15 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
for (int i = 0; i < unionSize; i++) {
unionFields.add(toField(DEFAULT_ARROW_FIELD_NAME, objectTypeInfos.get(i)));
}
- return new Field(name, FieldType.nullable(Types.MinorType.UNION.getType()), unionFields);
+ return new Field(name, FieldType.nullable(MinorType.UNION.getType()), unionFields);
case MAP:
final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
final TypeInfo keyTypeInfo = mapTypeInfo.getMapKeyTypeInfo();
final TypeInfo valueTypeInfo = mapTypeInfo.getMapValueTypeInfo();
-
final StructTypeInfo mapStructTypeInfo = new StructTypeInfo();
mapStructTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", "values"));
mapStructTypeInfo.setAllStructFieldTypeInfos(
Lists.newArrayList(keyTypeInfo, valueTypeInfo));
-
final ListTypeInfo mapListStructTypeInfo = new ListTypeInfo();
mapListStructTypeInfo.setListElementTypeInfo(mapStructTypeInfo);
@@ -1137,18 +221,28 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
}
}
- private static ListColumnVector toStructListVector(MapColumnVector mapVector) {
+ static ListTypeInfo toStructListTypeInfo(MapTypeInfo mapTypeInfo) {
+ final StructTypeInfo structTypeInfo = new StructTypeInfo();
+ structTypeInfo.setAllStructFieldNames(Lists.newArrayList("keys", "values"));
+ structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(
+ mapTypeInfo.getMapKeyTypeInfo(), mapTypeInfo.getMapValueTypeInfo()));
+ final ListTypeInfo structListTypeInfo = new ListTypeInfo();
+ structListTypeInfo.setListElementTypeInfo(structTypeInfo);
+ return structListTypeInfo;
+ }
+
+ static ListColumnVector toStructListVector(MapColumnVector mapVector) {
final StructColumnVector structVector;
final ListColumnVector structListVector;
structVector = new StructColumnVector();
structVector.fields = new ColumnVector[] {mapVector.keys, mapVector.values};
structListVector = new ListColumnVector();
structListVector.child = structVector;
- System.arraycopy(mapVector.offsets, 0, structListVector.offsets, 0, mapVector.childCount);
- System.arraycopy(mapVector.lengths, 0, structListVector.lengths, 0, mapVector.childCount);
structListVector.childCount = mapVector.childCount;
structListVector.isRepeating = mapVector.isRepeating;
structListVector.noNulls = mapVector.noNulls;
+ System.arraycopy(mapVector.offsets, 0, structListVector.offsets, 0, mapVector.childCount);
+ System.arraycopy(mapVector.lengths, 0, structListVector.lengths, 0, mapVector.childCount);
return structListVector;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2726f302/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java
new file mode 100644
index 0000000..fb5800b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Deserializer.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.arrow;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.IntervalDayVector;
+import org.apache.arrow.vector.IntervalYearVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.holders.NullableIntervalDayHolder;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.Writable;
+
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MS_PER_SECOND;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_MS;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.NS_PER_SECOND;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.SECOND_PER_DAY;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListTypeInfo;
+import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStructListVector;
+
+class Deserializer {
+ private final ArrowColumnarBatchSerDe serDe;
+ private final VectorExtractRow vectorExtractRow;
+ private final VectorizedRowBatch vectorizedRowBatch;
+ private Object[][] rows;
+
+ Deserializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
+ this.serDe = serDe;
+ vectorExtractRow = new VectorExtractRow();
+ final List<TypeInfo> fieldTypeInfoList = serDe.rowTypeInfo.getAllStructFieldTypeInfos();
+ final int fieldCount = fieldTypeInfoList.size();
+ final TypeInfo[] typeInfos = fieldTypeInfoList.toArray(new TypeInfo[fieldCount]);
+ try {
+ vectorExtractRow.init(typeInfos);
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+
+ vectorizedRowBatch = new VectorizedRowBatch(fieldCount);
+ for (int fieldIndex = 0; fieldIndex < fieldCount; fieldIndex++) {
+ final ColumnVector columnVector = createColumnVector(typeInfos[fieldIndex]);
+ columnVector.init();
+ vectorizedRowBatch.cols[fieldIndex] = columnVector;
+ }
+ }
+
+ public Object deserialize(Writable writable) {
+ final ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) writable;
+ final VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot();
+ final List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
+ final int fieldCount = fieldVectors.size();
+ final int rowCount = vectorSchemaRoot.getRowCount();
+ vectorizedRowBatch.ensureSize(rowCount);
+
+ if (rows == null || rows.length < rowCount ) {
+ rows = new Object[rowCount][];
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ rows[rowIndex] = new Object[fieldCount];
+ }
+ }
+
+ for (int fieldIndex = 0; fieldIndex < fieldCount; fieldIndex++) {
+ final FieldVector fieldVector = fieldVectors.get(fieldIndex);
+ final int projectedCol = vectorizedRowBatch.projectedColumns[fieldIndex];
+ final ColumnVector columnVector = vectorizedRowBatch.cols[projectedCol];
+ final TypeInfo typeInfo = serDe.rowTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
+ read(fieldVector, columnVector, typeInfo);
+ }
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+ vectorExtractRow.extractRow(vectorizedRowBatch, rowIndex, rows[rowIndex]);
+ }
+ vectorizedRowBatch.reset();
+ return rows;
+ }
+
+ private void read(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo) {
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ readPrimitive(arrowVector, hiveVector, typeInfo);
+ break;
+ case LIST:
+ readList(arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo);
+ break;
+ case MAP:
+ readMap(arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo);
+ break;
+ case STRUCT:
+ readStruct(arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo);
+ break;
+ case UNION:
+ readUnion(arrowVector, (UnionColumnVector) hiveVector, (UnionTypeInfo) typeInfo);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ private void readPrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo) {
+ final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
+ ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+
+ final int size = arrowVector.getValueCount();
+ hiveVector.ensureSize(size, false);
+
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((BitVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case BYTE:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((TinyIntVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case SHORT:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((SmallIntVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case INT:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((IntVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case LONG:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((BigIntVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case FLOAT:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((DoubleColumnVector) hiveVector).vector[i] = ((Float4Vector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case DOUBLE:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((DoubleColumnVector) hiveVector).vector[i] = ((Float8Vector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case STRING:
+ case VARCHAR:
+ case CHAR:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((BytesColumnVector) hiveVector).setVal(i, ((VarCharVector) arrowVector).get(i));
+ }
+ }
+ }
+ break;
+ case DATE:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((DateDayVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case TIMESTAMP:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+
+ // Time = second + sub-second
+ final long timeInNanos = ((TimeStampNanoVector) arrowVector).get(i);
+ final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector;
+ int subSecondInNanos = (int) (timeInNanos % NS_PER_SECOND);
+ long second = timeInNanos / NS_PER_SECOND;
+
+ // A nanosecond value should not be negative
+ if (subSecondInNanos < 0) {
+
+ // So add one second to the negative nanosecond value to make it positive
+ subSecondInNanos += NS_PER_SECOND;
+
+ // Subtract one second from the second value because we added one second,
+ // then subtract one more second because of the ceiling in the division.
+ second -= 2;
+ }
+ timestampColumnVector.time[i] = second * MS_PER_SECOND;
+ timestampColumnVector.nanos[i] = subSecondInNanos;
+ }
+ }
+ }
+ break;
+ case BINARY:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((BytesColumnVector) hiveVector).setVal(i, ((VarBinaryVector) arrowVector).get(i));
+ }
+ }
+ }
+ break;
+ case DECIMAL:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((DecimalColumnVector) hiveVector).set(i,
+ HiveDecimal.create(((DecimalVector) arrowVector).getObject(i)));
+ }
+ }
+ }
+ break;
+ case INTERVAL_YEAR_MONTH:
+ {
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ ((LongColumnVector) hiveVector).vector[i] = ((IntervalYearVector) arrowVector).get(i);
+ }
+ }
+ }
+ break;
+ case INTERVAL_DAY_TIME:
+ {
+ final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector;
+ final NullableIntervalDayHolder intervalDayHolder = new NullableIntervalDayHolder();
+ final HiveIntervalDayTime intervalDayTime = new HiveIntervalDayTime();
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ intervalDayVector.get(i, intervalDayHolder);
+ final long seconds = intervalDayHolder.days * SECOND_PER_DAY +
+ intervalDayHolder.milliseconds / MS_PER_SECOND;
+ final int nanos = (intervalDayHolder.milliseconds % 1_000) * NS_PER_MS;
+ intervalDayTime.set(seconds, nanos);
+ ((IntervalDayTimeColumnVector) hiveVector).set(i, intervalDayTime);
+ }
+ }
+ }
+ break;
+ case VOID:
+ case TIMESTAMPLOCALTZ:
+ case UNKNOWN:
+ default:
+ break;
+ }
+ }
+
+ private void readList(FieldVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo) {
+ final int size = arrowVector.getValueCount();
+ final ArrowBuf offsets = arrowVector.getOffsetBuffer();
+ final int OFFSET_WIDTH = 4;
+
+ read(arrowVector.getChildrenFromFields().get(0),
+ hiveVector.child,
+ typeInfo.getListElementTypeInfo());
+
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ final int offset = offsets.getInt(i * OFFSET_WIDTH);
+ hiveVector.offsets[i] = offset;
+ hiveVector.lengths[i] = offsets.getInt((i + 1) * OFFSET_WIDTH) - offset;
+ }
+ }
+ }
+
+ private void readMap(FieldVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo) {
+ final int size = arrowVector.getValueCount();
+ final ListTypeInfo mapStructListTypeInfo = toStructListTypeInfo(typeInfo);
+ final ListColumnVector mapStructListVector = toStructListVector(hiveVector);
+ final StructColumnVector mapStructVector = (StructColumnVector) mapStructListVector.child;
+
+ read(arrowVector, mapStructListVector, mapStructListTypeInfo);
+
+ hiveVector.isRepeating = mapStructListVector.isRepeating;
+ hiveVector.childCount = mapStructListVector.childCount;
+ hiveVector.noNulls = mapStructListVector.noNulls;
+ hiveVector.keys = mapStructVector.fields[0];
+ hiveVector.values = mapStructVector.fields[1];
+ System.arraycopy(mapStructListVector.offsets, 0, hiveVector.offsets, 0, size);
+ System.arraycopy(mapStructListVector.lengths, 0, hiveVector.lengths, 0, size);
+ System.arraycopy(mapStructListVector.isNull, 0, hiveVector.isNull, 0, size);
+ }
+
+ private void readStruct(FieldVector arrowVector, StructColumnVector hiveVector, StructTypeInfo typeInfo) {
+ final int size = arrowVector.getValueCount();
+ final List<TypeInfo> fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+ final int fieldSize = arrowVector.getChildrenFromFields().size();
+ for (int i = 0; i < fieldSize; i++) {
+ read(arrowVector.getChildrenFromFields().get(i), hiveVector.fields[i], fieldTypeInfos.get(i));
+ }
+
+ for (int i = 0; i < size; i++) {
+ if (arrowVector.isNull(i)) {
+ VectorizedBatchUtil.setNullColIsNullValue(hiveVector, i);
+ } else {
+ hiveVector.isNull[i] = false;
+ }
+ }
+ }
+
+ private void readUnion(FieldVector arrowVector, UnionColumnVector hiveVector, UnionTypeInfo typeInfo) {
+ }
+}
[6/6] hive git commit: HIVE-19308: Provide an Arrow stream reader for
external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Posted by vg...@apache.org.
HIVE-19308: Provide an Arrow stream reader for external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2334a0dd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2334a0dd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2334a0dd
Branch: refs/heads/branch-3
Commit: 2334a0ddfbd1a96d5fa5891a51be57f6cf408789
Parents: f7f90a0
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon May 21 13:47:43 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 29 13:58:34 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/jdbc/AbstractJdbcTriggersTest.java | 4 +-
.../apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 615 +++++++++++++++++++
.../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 603 ------------------
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 230 +++++++
.../hive/jdbc/TestJdbcWithMiniLlapRow.java | 45 ++
.../hadoop/hive/llap/LlapBaseRecordReader.java | 101 ++-
.../hadoop/hive/llap/LlapRowRecordReader.java | 26 +-
llap-ext-client/pom.xml | 5 +
.../hive/llap/LlapArrowBatchRecordReader.java | 82 +++
.../hive/llap/LlapArrowRowInputFormat.java | 53 ++
.../hive/llap/LlapArrowRowRecordReader.java | 107 ++++
.../hadoop/hive/llap/LlapBaseInputFormat.java | 27 +-
pom.xml | 1 +
.../hive/ql/io/arrow/ArrowWrapperWritable.java | 18 +-
.../hive/ql/io/arrow/RootAllocatorFactory.java | 9 +
.../hadoop/hive/llap/TestLlapOutputFormat.java | 1 +
17 files changed, 1254 insertions(+), 675 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8780374..8347f7f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4161,7 +4161,7 @@ public class HiveConf extends Configuration {
Constants.LLAP_LOGGER_NAME_RFA,
Constants.LLAP_LOGGER_NAME_CONSOLE),
"logger used for llap-daemons."),
- LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false,
+ LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", true,
"Whether LLapOutputFormatService should output arrow batches"),
HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
index 17e44bb..7d5172b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java
@@ -90,7 +90,7 @@ public abstract class AbstractJdbcTriggersTest {
@Before
public void setUp() throws Exception {
- hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ hs2Conn = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
}
@After
@@ -124,7 +124,7 @@ public abstract class AbstractJdbcTriggersTest {
throws Exception {
Connection con = hs2Conn;
- TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
+ BaseJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString());
createSleepUDF();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
new file mode 100644
index 0000000..11017f6
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.LlapRowRecordReader;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.NucleusContext;
+import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
+import org.datanucleus.AbstractNucleusContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * Specialize this base class for different serde's/formats
+ * {@link #beforeTest(boolean) beforeTest} should be called
+ * by sub-classes in a {@link org.junit.BeforeClass} initializer
+ */
+public abstract class BaseJdbcWithMiniLlap {
+ private static MiniHS2 miniHS2 = null;
+ private static String dataFileDir;
+ private static Path kvDataFilePath;
+ private static Path dataTypesFilePath;
+
+ private static HiveConf conf = null;
+ private static Connection hs2Conn = null;
+
+ // This method should be called by sub-classes in a @BeforeClass initializer
+ public static void beforeTest(boolean useArrow) throws Exception {
+ Class.forName(MiniHS2.getJdbcDriverName());
+
+ String confDir = "../../data/conf/llap/";
+ if (confDir != null && !confDir.isEmpty()) {
+ HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
+ }
+
+ conf = new HiveConf();
+ conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ if(useArrow) {
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+ } else {
+ conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+ }
+
+ conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ + "/tez-site.xml"));
+
+ miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
+
+ dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
+ kvDataFilePath = new Path(dataFileDir, "kv1.txt");
+ dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ miniHS2.start(confOverlay);
+ miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+ }
+
+ public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException {
+ Connection conn = DriverManager.getConnection(jdbcURL, user, pwd);
+ conn.createStatement().execute("set hive.support.concurrency = false");
+ return conn;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LlapBaseInputFormat.closeAll();
+ hs2Conn.close();
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ }
+
+ private void createTestTable(String tableName) throws Exception {
+ createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
+ }
+
+ public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws
+ Exception {
+ Statement stmt = connection.createStatement();
+
+ if (database != null) {
+ stmt.execute("CREATE DATABASE IF NOT EXISTS " + database);
+ stmt.execute("USE " + database);
+ }
+
+ // create table
+ stmt.execute("DROP TABLE IF EXISTS " + tableName);
+ stmt.execute("CREATE TABLE " + tableName
+ + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
+
+ // load data
+ stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName);
+
+ ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
+ assertTrue(res.next());
+ assertEquals("val_238", res.getString(2));
+ res.close();
+ stmt.close();
+ }
+
+ protected void createDataTypesTable(String tableName) throws Exception {
+ Statement stmt = hs2Conn.createStatement();
+
+ // create table
+ stmt.execute("DROP TABLE IF EXISTS " + tableName);
+ // tables with various types
+ stmt.execute("create table " + tableName
+ + " (c1 int, c2 boolean, c3 double, c4 string,"
+ + " c5 array<int>, c6 map<int,string>, c7 map<string,string>,"
+ + " c8 struct<r:string,s:int,t:double>,"
+ + " c9 tinyint, c10 smallint, c11 float, c12 bigint,"
+ + " c13 array<array<string>>,"
+ + " c14 map<int, map<int,int>>,"
+ + " c15 struct<r:int,s:struct<a:int,b:string>>,"
+ + " c16 array<struct<m:map<string,string>,n:int>>,"
+ + " c17 timestamp, "
+ + " c18 decimal(16,7), "
+ + " c19 binary, "
+ + " c20 date,"
+ + " c21 varchar(20),"
+ + " c22 char(15),"
+ + " c23 binary"
+ + ")");
+ stmt.execute("load data local inpath '"
+ + dataTypesFilePath.toString() + "' into table " + tableName);
+ stmt.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLlapInputFormatEndToEnd() throws Exception {
+ createTestTable("testtab1");
+
+ int rowCount;
+
+ RowCollector rowCollector = new RowCollector();
+ String query = "select * from testtab1 where under_col = 0";
+ rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+ assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0));
+ assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1));
+ assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2));
+
+ // Try empty rows query
+ rowCollector.rows.clear();
+ query = "select * from testtab1 where true = false";
+ rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(0, rowCount);
+ }
+
+ @Test(timeout = 60000)
+ public void testNonAsciiStrings() throws Exception {
+ createTestTable("testtab_nonascii");
+
+ RowCollector rowCollector = new RowCollector();
+ String nonAscii = "À côté du garçon";
+ String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0));
+ assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1));
+ assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2));
+ }
+
+ @Test(timeout = 60000)
+ public void testEscapedStrings() throws Exception {
+ createTestTable("testtab1");
+
+ RowCollector rowCollector = new RowCollector();
+ String expectedVal1 = "'a',\"b\",\\c\\";
+ String expectedVal2 = "multi\nline";
+ String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0));
+ assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1));
+ assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2));
+ }
+
+ @Test(timeout = 60000)
+ public void testDataTypes() throws Exception {
+ createDataTypesTable("datatypes");
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from datatypes";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ // Verify schema
+ String[][] colNameTypes = new String[][] {
+ {"datatypes.c1", "int"},
+ {"datatypes.c2", "boolean"},
+ {"datatypes.c3", "double"},
+ {"datatypes.c4", "string"},
+ {"datatypes.c5", "array<int>"},
+ {"datatypes.c6", "map<int,string>"},
+ {"datatypes.c7", "map<string,string>"},
+ {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+ {"datatypes.c9", "tinyint"},
+ {"datatypes.c10", "smallint"},
+ {"datatypes.c11", "float"},
+ {"datatypes.c12", "bigint"},
+ {"datatypes.c13", "array<array<string>>"},
+ {"datatypes.c14", "map<int,map<int,int>>"},
+ {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+ {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+ {"datatypes.c17", "timestamp"},
+ {"datatypes.c18", "decimal(16,7)"},
+ {"datatypes.c19", "binary"},
+ {"datatypes.c20", "date"},
+ {"datatypes.c21", "varchar(20)"},
+ {"datatypes.c22", "char(15)"},
+ {"datatypes.c23", "binary"},
+ };
+ FieldDesc fieldDesc;
+ assertEquals(23, rowCollector.numColumns);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ fieldDesc = rowCollector.schema.getColumns().get(idx);
+ assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+ assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+ }
+
+ // First row is all nulls
+ Object[] rowValues = rowCollector.rows.get(0);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ assertEquals("idx=" + idx, null, rowValues[idx]);
+ }
+
+ // Second Row
+ rowValues = rowCollector.rows.get(1);
+ assertEquals(Integer.valueOf(-1), rowValues[0]);
+ assertEquals(Boolean.FALSE, rowValues[1]);
+ assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+ assertEquals("", rowValues[3]);
+
+ List<?> c5Value = (List<?>) rowValues[4];
+ assertEquals(0, c5Value.size());
+
+ Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(0, c6Value.size());
+
+ Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(0, c7Value.size());
+
+ List<?> c8Value = (List<?>) rowValues[7];
+ assertEquals(null, c8Value.get(0));
+ assertEquals(null, c8Value.get(1));
+ assertEquals(null, c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+ assertEquals(Short.valueOf((short) -1), rowValues[9]);
+ assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+ List<?> c13Value = (List<?>) rowValues[12];
+ assertEquals(0, c13Value.size());
+
+ Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(0, c14Value.size());
+
+ List<?> c15Value = (List<?>) rowValues[14];
+ assertEquals(null, c15Value.get(0));
+ assertEquals(null, c15Value.get(1));
+
+ List<?> c16Value = (List<?>) rowValues[15];
+ assertEquals(0, c16Value.size());
+
+ assertEquals(null, rowValues[16]);
+ assertEquals(null, rowValues[17]);
+ assertEquals(null, rowValues[18]);
+ assertEquals(null, rowValues[19]);
+ assertEquals(null, rowValues[20]);
+ assertEquals(null, rowValues[21]);
+ assertEquals(null, rowValues[22]);
+
+ // Third row
+ rowValues = rowCollector.rows.get(2);
+ assertEquals(Integer.valueOf(1), rowValues[0]);
+ assertEquals(Boolean.TRUE, rowValues[1]);
+ assertEquals(Double.valueOf(1.1d), rowValues[2]);
+ assertEquals("1", rowValues[3]);
+
+ c5Value = (List<?>) rowValues[4];
+ assertEquals(2, c5Value.size());
+ assertEquals(Integer.valueOf(1), c5Value.get(0));
+ assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+ c6Value = (Map<?,?>) rowValues[5];
+ assertEquals(2, c6Value.size());
+ assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+ c7Value = (Map<?,?>) rowValues[6];
+ assertEquals(1, c7Value.size());
+ assertEquals("v", c7Value.get("k"));
+
+ c8Value = (List<?>) rowValues[7];
+ assertEquals("a", c8Value.get(0));
+ assertEquals(Integer.valueOf(9), c8Value.get(1));
+ assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+ assertEquals(Short.valueOf((short) 1), rowValues[9]);
+ assertEquals(Float.valueOf(1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(1l), rowValues[11]);
+
+ c13Value = (List<?>) rowValues[12];
+ assertEquals(2, c13Value.size());
+ List<?> listVal = (List<?>) c13Value.get(0);
+ assertEquals("a", listVal.get(0));
+ assertEquals("b", listVal.get(1));
+ listVal = (List<?>) c13Value.get(1);
+ assertEquals("c", listVal.get(0));
+ assertEquals("d", listVal.get(1));
+
+ c14Value = (Map<?,?>) rowValues[13];
+ assertEquals(2, c14Value.size());
+ Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ assertEquals(2, mapVal.size());
+ assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ assertEquals(1, mapVal.size());
+ assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+ c15Value = (List<?>) rowValues[14];
+ assertEquals(Integer.valueOf(1), c15Value.get(0));
+ listVal = (List<?>) c15Value.get(1);
+ assertEquals(2, listVal.size());
+ assertEquals(Integer.valueOf(2), listVal.get(0));
+ assertEquals("x", listVal.get(1));
+
+ c16Value = (List<?>) rowValues[15];
+ assertEquals(2, c16Value.size());
+ listVal = (List<?>) c16Value.get(0);
+ assertEquals(2, listVal.size());
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(0, mapVal.size());
+ assertEquals(Integer.valueOf(1), listVal.get(1));
+ listVal = (List<?>) c16Value.get(1);
+ mapVal = (Map<?,?>) listVal.get(0);
+ assertEquals(2, mapVal.size());
+ assertEquals("b", mapVal.get("a"));
+ assertEquals("d", mapVal.get("c"));
+ assertEquals(Integer.valueOf(2), listVal.get(1));
+
+ assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
+ assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+ assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+ assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+ assertEquals("abc123", rowValues[20]);
+ assertEquals("abc123 ", rowValues[21]);
+ assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+ }
+
+ private interface RowProcessor {
+ void process(Row row);
+ }
+
+ protected static class RowCollector implements RowProcessor {
+ ArrayList<String[]> rows = new ArrayList<String[]>();
+ Schema schema = null;
+ int numColumns = 0;
+
+ public void process(Row row) {
+ if (schema == null) {
+ schema = row.getSchema();
+ numColumns = schema.getColumns().size();
+ }
+
+ String[] arr = new String[numColumns];
+ for (int idx = 0; idx < numColumns; ++idx) {
+ Object val = row.getValue(idx);
+ arr[idx] = (val == null ? null : val.toString());
+ }
+ rows.add(arr);
+ }
+ }
+
+ // Save the actual values from each row as opposed to the String representation.
+ protected static class RowCollector2 implements RowProcessor {
+ ArrayList<Object[]> rows = new ArrayList<Object[]>();
+ Schema schema = null;
+ int numColumns = 0;
+
+ public void process(Row row) {
+ if (schema == null) {
+ schema = row.getSchema();
+ numColumns = schema.getColumns().size();
+ }
+
+ Object[] arr = new Object[numColumns];
+ for (int idx = 0; idx < numColumns; ++idx) {
+ arr[idx] = row.getValue(idx);
+ }
+ rows.add(arr);
+ }
+ }
+
+ protected int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception {
+ return processQuery(null, query, numSplits, rowProcessor);
+ }
+
+ protected abstract InputFormat<NullWritable, Row> getInputFormat();
+
+ private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception {
+ String url = miniHS2.getJdbcURL();
+ String user = System.getProperty("user.name");
+ String pwd = user;
+ String handleId = UUID.randomUUID().toString();
+
+ InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+ // Get splits
+ JobConf job = new JobConf(conf);
+ job.set(LlapBaseInputFormat.URL_KEY, url);
+ job.set(LlapBaseInputFormat.USER_KEY, user);
+ job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+ job.set(LlapBaseInputFormat.QUERY_KEY, query);
+ job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+ if (currentDatabase != null) {
+ job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
+ }
+
+ InputSplit[] splits = inputFormat.getSplits(job, numSplits);
+ assertTrue(splits.length > 0);
+
+ // Fetch rows from splits
+ boolean first = true;
+ int rowCount = 0;
+ for (InputSplit split : splits) {
+ System.out.println("Processing split " + split.getLocations());
+
+ int numColumns = 2;
+ RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
+ Row row = reader.createValue();
+ while (reader.next(NullWritable.get(), row)) {
+ rowProcessor.process(row);
+ ++rowCount;
+ }
+ reader.close();
+ }
+ LlapBaseInputFormat.close(handleId);
+
+ return rowCount;
+ }
+
+ /**
+ * Test CLI kill command of a query that is running.
+ * We spawn 2 threads - one running the query and
+ * the other attempting to cancel.
+ * We're using a dummy udf to simulate a query,
+ * that runs for a sufficiently long time.
+ * @throws Exception
+ */
+ @Test
+ public void testKillQuery() throws Exception {
+ String tableName = "testtab1";
+ createTestTable(tableName);
+ Connection con = hs2Conn;
+ Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
+
+ String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
+ Statement stmt1 = con.createStatement();
+ Statement stmt2 = con2.createStatement();
+ stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
+ stmt1.close();
+ final Statement stmt = con.createStatement();
+
+ ExceptionHolder tExecuteHolder = new ExceptionHolder();
+ ExceptionHolder tKillHolder = new ExceptionHolder();
+
+ // Thread executing the query
+ Thread tExecute = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ System.out.println("Executing query: ");
+ // The test table has 500 rows, so total query time should be ~ 500*500ms
+ stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " +
+ "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col");
+ fail("Expecting SQLException");
+ } catch (SQLException e) {
+ tExecuteHolder.throwable = e;
+ }
+ }
+ });
+ // Thread killing the query
+ Thread tKill = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ String queryId = ((HiveStatement) stmt).getQueryId();
+ System.out.println("Killing query: " + queryId);
+
+ stmt2.execute("kill query '" + queryId + "'");
+ stmt2.close();
+ } catch (Exception e) {
+ tKillHolder.throwable = e;
+ }
+ }
+ });
+
+ tExecute.start();
+ tKill.start();
+ tExecute.join();
+ tKill.join();
+ stmt.close();
+ con2.close();
+
+ assertNotNull("tExecute", tExecuteHolder.throwable);
+ assertNull("tCancel", tKillHolder.throwable);
+ }
+
+ private static class ExceptionHolder {
+ Throwable throwable;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
deleted file mode 100644
index 68a8e21..0000000
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.jdbc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.math.BigDecimal;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.Date;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.FieldDesc;
-import org.apache.hadoop.hive.llap.LlapRowRecordReader;
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowInputFormat;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import org.datanucleus.ClassLoaderResolver;
-import org.datanucleus.NucleusContext;
-import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
-import org.datanucleus.AbstractNucleusContext;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestJdbcWithMiniLlap {
- private static MiniHS2 miniHS2 = null;
- private static String dataFileDir;
- private static Path kvDataFilePath;
- private static Path dataTypesFilePath;
-
- private static HiveConf conf = null;
- private Connection hs2Conn = null;
-
- @BeforeClass
- public static void beforeTest() throws Exception {
- Class.forName(MiniHS2.getJdbcDriverName());
-
- String confDir = "../../data/conf/llap/";
- if (confDir != null && !confDir.isEmpty()) {
- HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml"));
- System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
- }
-
- conf = new HiveConf();
- conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
-
- miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
- dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
- kvDataFilePath = new Path(dataFileDir, "kv1.txt");
- dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
- Map<String, String> confOverlay = new HashMap<String, String>();
- miniHS2.start(confOverlay);
- miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous"));
- }
-
- @Before
- public void setUp() throws Exception {
- hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
- }
-
- public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException {
- Connection conn = DriverManager.getConnection(jdbcURL, user, pwd);
- conn.createStatement().execute("set hive.support.concurrency = false");
- return conn;
- }
-
- @After
- public void tearDown() throws Exception {
- LlapBaseInputFormat.closeAll();
- hs2Conn.close();
- }
-
- @AfterClass
- public static void afterTest() throws Exception {
- if (miniHS2.isStarted()) {
- miniHS2.stop();
- }
- }
-
- private void createTestTable(String tableName) throws Exception {
- createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
- }
-
- public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws
- Exception {
- Statement stmt = connection.createStatement();
-
- if (database != null) {
- stmt.execute("CREATE DATABASE IF NOT EXISTS " + database);
- stmt.execute("USE " + database);
- }
-
- // create table
- stmt.execute("DROP TABLE IF EXISTS " + tableName);
- stmt.execute("CREATE TABLE " + tableName
- + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'");
-
- // load data
- stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName);
-
- ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName);
- assertTrue(res.next());
- assertEquals("val_238", res.getString(2));
- res.close();
- stmt.close();
- }
-
- private void createDataTypesTable(String tableName) throws Exception {
- Statement stmt = hs2Conn.createStatement();
-
- // create table
- stmt.execute("DROP TABLE IF EXISTS " + tableName);
- // tables with various types
- stmt.execute("create table " + tableName
- + " (c1 int, c2 boolean, c3 double, c4 string,"
- + " c5 array<int>, c6 map<int,string>, c7 map<string,string>,"
- + " c8 struct<r:string,s:int,t:double>,"
- + " c9 tinyint, c10 smallint, c11 float, c12 bigint,"
- + " c13 array<array<string>>,"
- + " c14 map<int, map<int,int>>,"
- + " c15 struct<r:int,s:struct<a:int,b:string>>,"
- + " c16 array<struct<m:map<string,string>,n:int>>,"
- + " c17 timestamp, "
- + " c18 decimal(16,7), "
- + " c19 binary, "
- + " c20 date,"
- + " c21 varchar(20),"
- + " c22 char(15),"
- + " c23 binary"
- + ")");
- stmt.execute("load data local inpath '"
- + dataTypesFilePath.toString() + "' into table " + tableName);
- stmt.close();
- }
-
- @Test(timeout = 60000)
- public void testLlapInputFormatEndToEnd() throws Exception {
- createTestTable("testtab1");
-
- int rowCount;
-
- RowCollector rowCollector = new RowCollector();
- String query = "select * from testtab1 where under_col = 0";
- rowCount = processQuery(query, 1, rowCollector);
- assertEquals(3, rowCount);
- assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(0));
- assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(1));
- assertArrayEquals(new String[] {"0", "val_0"}, rowCollector.rows.get(2));
-
- // Try empty rows query
- rowCollector.rows.clear();
- query = "select * from testtab1 where true = false";
- rowCount = processQuery(query, 1, rowCollector);
- assertEquals(0, rowCount);
- }
-
- @Test(timeout = 60000)
- public void testNonAsciiStrings() throws Exception {
- createTestTable(hs2Conn, "nonascii", "testtab_nonascii", kvDataFilePath.toString());
-
- RowCollector rowCollector = new RowCollector();
- String nonAscii = "À côté du garçon";
- String query = "select value, '" + nonAscii + "' from testtab_nonascii where under_col=0";
- int rowCount = processQuery("nonascii", query, 1, rowCollector);
- assertEquals(3, rowCount);
-
- assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(0));
- assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(1));
- assertArrayEquals(new String[] {"val_0", nonAscii}, rowCollector.rows.get(2));
- }
-
- @Test(timeout = 60000)
- public void testEscapedStrings() throws Exception {
- createTestTable("testtab1");
-
- RowCollector rowCollector = new RowCollector();
- String expectedVal1 = "'a',\"b\",\\c\\";
- String expectedVal2 = "multi\nline";
- String query = "select value, '\\'a\\',\"b\",\\\\c\\\\', 'multi\\nline' from testtab1 where under_col=0";
- int rowCount = processQuery(query, 1, rowCollector);
- assertEquals(3, rowCount);
-
- assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(0));
- assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(1));
- assertArrayEquals(new String[] {"val_0", expectedVal1, expectedVal2}, rowCollector.rows.get(2));
- }
-
- @Test(timeout = 60000)
- public void testDataTypes() throws Exception {
- createDataTypesTable("datatypes");
- RowCollector2 rowCollector = new RowCollector2();
- String query = "select * from datatypes";
- int rowCount = processQuery(query, 1, rowCollector);
- assertEquals(3, rowCount);
-
- // Verify schema
- String[][] colNameTypes = new String[][] {
- {"datatypes.c1", "int"},
- {"datatypes.c2", "boolean"},
- {"datatypes.c3", "double"},
- {"datatypes.c4", "string"},
- {"datatypes.c5", "array<int>"},
- {"datatypes.c6", "map<int,string>"},
- {"datatypes.c7", "map<string,string>"},
- {"datatypes.c8", "struct<r:string,s:int,t:double>"},
- {"datatypes.c9", "tinyint"},
- {"datatypes.c10", "smallint"},
- {"datatypes.c11", "float"},
- {"datatypes.c12", "bigint"},
- {"datatypes.c13", "array<array<string>>"},
- {"datatypes.c14", "map<int,map<int,int>>"},
- {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
- {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
- {"datatypes.c17", "timestamp"},
- {"datatypes.c18", "decimal(16,7)"},
- {"datatypes.c19", "binary"},
- {"datatypes.c20", "date"},
- {"datatypes.c21", "varchar(20)"},
- {"datatypes.c22", "char(15)"},
- {"datatypes.c23", "binary"},
- };
- FieldDesc fieldDesc;
- assertEquals(23, rowCollector.numColumns);
- for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
- fieldDesc = rowCollector.schema.getColumns().get(idx);
- assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
- assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
- }
-
- // First row is all nulls
- Object[] rowValues = rowCollector.rows.get(0);
- for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
- assertEquals("idx=" + idx, null, rowValues[idx]);
- }
-
- // Second Row
- rowValues = rowCollector.rows.get(1);
- assertEquals(Integer.valueOf(-1), rowValues[0]);
- assertEquals(Boolean.FALSE, rowValues[1]);
- assertEquals(Double.valueOf(-1.1d), rowValues[2]);
- assertEquals("", rowValues[3]);
-
- List<?> c5Value = (List<?>) rowValues[4];
- assertEquals(0, c5Value.size());
-
- Map<?,?> c6Value = (Map<?,?>) rowValues[5];
- assertEquals(0, c6Value.size());
-
- Map<?,?> c7Value = (Map<?,?>) rowValues[6];
- assertEquals(0, c7Value.size());
-
- List<?> c8Value = (List<?>) rowValues[7];
- assertEquals(null, c8Value.get(0));
- assertEquals(null, c8Value.get(1));
- assertEquals(null, c8Value.get(2));
-
- assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
- assertEquals(Short.valueOf((short) -1), rowValues[9]);
- assertEquals(Float.valueOf(-1.0f), rowValues[10]);
- assertEquals(Long.valueOf(-1l), rowValues[11]);
-
- List<?> c13Value = (List<?>) rowValues[12];
- assertEquals(0, c13Value.size());
-
- Map<?,?> c14Value = (Map<?,?>) rowValues[13];
- assertEquals(0, c14Value.size());
-
- List<?> c15Value = (List<?>) rowValues[14];
- assertEquals(null, c15Value.get(0));
- assertEquals(null, c15Value.get(1));
-
- List<?> c16Value = (List<?>) rowValues[15];
- assertEquals(0, c16Value.size());
-
- assertEquals(null, rowValues[16]);
- assertEquals(null, rowValues[17]);
- assertEquals(null, rowValues[18]);
- assertEquals(null, rowValues[19]);
- assertEquals(null, rowValues[20]);
- assertEquals(null, rowValues[21]);
- assertEquals(null, rowValues[22]);
-
- // Third row
- rowValues = rowCollector.rows.get(2);
- assertEquals(Integer.valueOf(1), rowValues[0]);
- assertEquals(Boolean.TRUE, rowValues[1]);
- assertEquals(Double.valueOf(1.1d), rowValues[2]);
- assertEquals("1", rowValues[3]);
-
- c5Value = (List<?>) rowValues[4];
- assertEquals(2, c5Value.size());
- assertEquals(Integer.valueOf(1), c5Value.get(0));
- assertEquals(Integer.valueOf(2), c5Value.get(1));
-
- c6Value = (Map<?,?>) rowValues[5];
- assertEquals(2, c6Value.size());
- assertEquals("x", c6Value.get(Integer.valueOf(1)));
- assertEquals("y", c6Value.get(Integer.valueOf(2)));
-
- c7Value = (Map<?,?>) rowValues[6];
- assertEquals(1, c7Value.size());
- assertEquals("v", c7Value.get("k"));
-
- c8Value = (List<?>) rowValues[7];
- assertEquals("a", c8Value.get(0));
- assertEquals(Integer.valueOf(9), c8Value.get(1));
- assertEquals(Double.valueOf(2.2d), c8Value.get(2));
-
- assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
- assertEquals(Short.valueOf((short) 1), rowValues[9]);
- assertEquals(Float.valueOf(1.0f), rowValues[10]);
- assertEquals(Long.valueOf(1l), rowValues[11]);
-
- c13Value = (List<?>) rowValues[12];
- assertEquals(2, c13Value.size());
- List<?> listVal = (List<?>) c13Value.get(0);
- assertEquals("a", listVal.get(0));
- assertEquals("b", listVal.get(1));
- listVal = (List<?>) c13Value.get(1);
- assertEquals("c", listVal.get(0));
- assertEquals("d", listVal.get(1));
-
- c14Value = (Map<?,?>) rowValues[13];
- assertEquals(2, c14Value.size());
- Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
- assertEquals(2, mapVal.size());
- assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
- assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
- mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
- assertEquals(1, mapVal.size());
- assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
-
- c15Value = (List<?>) rowValues[14];
- assertEquals(Integer.valueOf(1), c15Value.get(0));
- listVal = (List<?>) c15Value.get(1);
- assertEquals(2, listVal.size());
- assertEquals(Integer.valueOf(2), listVal.get(0));
- assertEquals("x", listVal.get(1));
-
- c16Value = (List<?>) rowValues[15];
- assertEquals(2, c16Value.size());
- listVal = (List<?>) c16Value.get(0);
- assertEquals(2, listVal.size());
- mapVal = (Map<?,?>) listVal.get(0);
- assertEquals(0, mapVal.size());
- assertEquals(Integer.valueOf(1), listVal.get(1));
- listVal = (List<?>) c16Value.get(1);
- mapVal = (Map<?,?>) listVal.get(0);
- assertEquals(2, mapVal.size());
- assertEquals("b", mapVal.get("a"));
- assertEquals("d", mapVal.get("c"));
- assertEquals(Integer.valueOf(2), listVal.get(1));
-
- assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
- assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
- assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
- assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
- assertEquals("abc123", rowValues[20]);
- assertEquals("abc123 ", rowValues[21]);
- assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
- }
-
- private interface RowProcessor {
- void process(Row row);
- }
-
- private static class RowCollector implements RowProcessor {
- ArrayList<String[]> rows = new ArrayList<String[]>();
- Schema schema = null;
- int numColumns = 0;
-
- public void process(Row row) {
- if (schema == null) {
- schema = row.getSchema();
- numColumns = schema.getColumns().size();
- }
-
- String[] arr = new String[numColumns];
- for (int idx = 0; idx < numColumns; ++idx) {
- Object val = row.getValue(idx);
- arr[idx] = (val == null ? null : val.toString());
- }
- rows.add(arr);
- }
- }
-
- // Save the actual values from each row as opposed to the String representation.
- private static class RowCollector2 implements RowProcessor {
- ArrayList<Object[]> rows = new ArrayList<Object[]>();
- Schema schema = null;
- int numColumns = 0;
-
- public void process(Row row) {
- if (schema == null) {
- schema = row.getSchema();
- numColumns = schema.getColumns().size();
- }
-
- Object[] arr = new Object[numColumns];
- for (int idx = 0; idx < numColumns; ++idx) {
- arr[idx] = row.getValue(idx);
- }
- rows.add(arr);
- }
- }
-
- private int processQuery(String query, int numSplits, RowProcessor rowProcessor) throws Exception {
- return processQuery(null, query, numSplits, rowProcessor);
- }
-
- private int processQuery(String currentDatabase, String query, int numSplits, RowProcessor rowProcessor) throws Exception {
- String url = miniHS2.getJdbcURL();
- String user = System.getProperty("user.name");
- String pwd = user;
- String handleId = UUID.randomUUID().toString();
-
- LlapRowInputFormat inputFormat = new LlapRowInputFormat();
-
- // Get splits
- JobConf job = new JobConf(conf);
- job.set(LlapBaseInputFormat.URL_KEY, url);
- job.set(LlapBaseInputFormat.USER_KEY, user);
- job.set(LlapBaseInputFormat.PWD_KEY, pwd);
- job.set(LlapBaseInputFormat.QUERY_KEY, query);
- job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
- if (currentDatabase != null) {
- job.set(LlapBaseInputFormat.DB_KEY, currentDatabase);
- }
-
- InputSplit[] splits = inputFormat.getSplits(job, numSplits);
- assertTrue(splits.length > 0);
-
- // Fetch rows from splits
- boolean first = true;
- int rowCount = 0;
- for (InputSplit split : splits) {
- System.out.println("Processing split " + split.getLocations());
-
- int numColumns = 2;
- RecordReader<NullWritable, Row> reader = inputFormat.getRecordReader(split, job, null);
- Row row = reader.createValue();
- while (reader.next(NullWritable.get(), row)) {
- rowProcessor.process(row);
- ++rowCount;
- }
- reader.close();
- }
- LlapBaseInputFormat.close(handleId);
-
- return rowCount;
- }
-
- /**
- * Test CLI kill command of a query that is running.
- * We spawn 2 threads - one running the query and
- * the other attempting to cancel.
- * We're using a dummy udf to simulate a query,
- * that runs for a sufficiently long time.
- * @throws Exception
- */
- @Test
- public void testKillQuery() throws Exception {
- String tableName = "testtab1";
- createTestTable(tableName);
- Connection con = hs2Conn;
- Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar");
-
- String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName();
- Statement stmt1 = con.createStatement();
- Statement stmt2 = con2.createStatement();
- stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
- stmt1.close();
- final Statement stmt = con.createStatement();
-
- ExceptionHolder tExecuteHolder = new ExceptionHolder();
- ExceptionHolder tKillHolder = new ExceptionHolder();
-
- // Thread executing the query
- Thread tExecute = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- System.out.println("Executing query: ");
- // The test table has 500 rows, so total query time should be ~ 500*500ms
- stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " +
- "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col");
- fail("Expecting SQLException");
- } catch (SQLException e) {
- tExecuteHolder.throwable = e;
- }
- }
- });
- // Thread killing the query
- Thread tKill = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(2000);
- String queryId = ((HiveStatement) stmt).getQueryId();
- System.out.println("Killing query: " + queryId);
-
- stmt2.execute("kill query '" + queryId + "'");
- stmt2.close();
- } catch (Exception e) {
- tKillHolder.throwable = e;
- }
- }
- });
-
- tExecute.start();
- tKill.start();
- tExecute.join();
- tKill.join();
- stmt.close();
- con2.close();
-
- assertNotNull("tExecute", tExecuteHolder.throwable);
- assertNull("tCancel", tKillHolder.throwable);
- }
-
- private static class ExceptionHolder {
- Throwable throwable;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
new file mode 100644
index 0000000..afb9837
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for Arrow format
+ */
+public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ BaseJdbcWithMiniLlap.beforeTest(true);
+ }
+
+ @Override
+ protected InputFormat<NullWritable, Row> getInputFormat() {
+ //For unit testing, no harm in hard-coding allocator ceiling to LONG.MAX_VALUE
+ return new LlapArrowRowInputFormat(Long.MAX_VALUE);
+ }
+
+ // Currently MAP type is not supported. Add it back when Arrow 1.0 is released.
+ // See: SPARK-21187
+ @Override
+ public void testDataTypes() throws Exception {
+ createDataTypesTable("datatypes");
+ RowCollector2 rowCollector = new RowCollector2();
+ String query = "select * from datatypes";
+ int rowCount = processQuery(query, 1, rowCollector);
+ assertEquals(3, rowCount);
+
+ // Verify schema
+ String[][] colNameTypes = new String[][] {
+ {"datatypes.c1", "int"},
+ {"datatypes.c2", "boolean"},
+ {"datatypes.c3", "double"},
+ {"datatypes.c4", "string"},
+ {"datatypes.c5", "array<int>"},
+ {"datatypes.c6", "map<int,string>"},
+ {"datatypes.c7", "map<string,string>"},
+ {"datatypes.c8", "struct<r:string,s:int,t:double>"},
+ {"datatypes.c9", "tinyint"},
+ {"datatypes.c10", "smallint"},
+ {"datatypes.c11", "float"},
+ {"datatypes.c12", "bigint"},
+ {"datatypes.c13", "array<array<string>>"},
+ {"datatypes.c14", "map<int,map<int,int>>"},
+ {"datatypes.c15", "struct<r:int,s:struct<a:int,b:string>>"},
+ {"datatypes.c16", "array<struct<m:map<string,string>,n:int>>"},
+ {"datatypes.c17", "timestamp"},
+ {"datatypes.c18", "decimal(16,7)"},
+ {"datatypes.c19", "binary"},
+ {"datatypes.c20", "date"},
+ {"datatypes.c21", "varchar(20)"},
+ {"datatypes.c22", "char(15)"},
+ {"datatypes.c23", "binary"},
+ };
+ FieldDesc fieldDesc;
+ assertEquals(23, rowCollector.numColumns);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ fieldDesc = rowCollector.schema.getColumns().get(idx);
+ assertEquals("ColName idx=" + idx, colNameTypes[idx][0], fieldDesc.getName());
+ assertEquals("ColType idx=" + idx, colNameTypes[idx][1], fieldDesc.getTypeInfo().getTypeName());
+ }
+
+ // First row is all nulls
+ Object[] rowValues = rowCollector.rows.get(0);
+ for (int idx = 0; idx < rowCollector.numColumns; ++idx) {
+ assertEquals("idx=" + idx, null, rowValues[idx]);
+ }
+
+ // Second Row
+ rowValues = rowCollector.rows.get(1);
+ assertEquals(Integer.valueOf(-1), rowValues[0]);
+ assertEquals(Boolean.FALSE, rowValues[1]);
+ assertEquals(Double.valueOf(-1.1d), rowValues[2]);
+ assertEquals("", rowValues[3]);
+
+ List<?> c5Value = (List<?>) rowValues[4];
+ assertEquals(0, c5Value.size());
+
+ //Map<?,?> c6Value = (Map<?,?>) rowValues[5];
+ //assertEquals(0, c6Value.size());
+
+ //Map<?,?> c7Value = (Map<?,?>) rowValues[6];
+ //assertEquals(0, c7Value.size());
+
+ List<?> c8Value = (List<?>) rowValues[7];
+ assertEquals(null, c8Value.get(0));
+ assertEquals(null, c8Value.get(1));
+ assertEquals(null, c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) -1), rowValues[8]);
+ assertEquals(Short.valueOf((short) -1), rowValues[9]);
+ assertEquals(Float.valueOf(-1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(-1l), rowValues[11]);
+
+ List<?> c13Value = (List<?>) rowValues[12];
+ assertEquals(0, c13Value.size());
+
+ //Map<?,?> c14Value = (Map<?,?>) rowValues[13];
+ //assertEquals(0, c14Value.size());
+
+ List<?> c15Value = (List<?>) rowValues[14];
+ assertEquals(null, c15Value.get(0));
+ assertEquals(null, c15Value.get(1));
+
+ //List<?> c16Value = (List<?>) rowValues[15];
+ //assertEquals(0, c16Value.size());
+
+ assertEquals(null, rowValues[16]);
+ assertEquals(null, rowValues[17]);
+ assertEquals(null, rowValues[18]);
+ assertEquals(null, rowValues[19]);
+ assertEquals(null, rowValues[20]);
+ assertEquals(null, rowValues[21]);
+ assertEquals(null, rowValues[22]);
+
+ // Third row
+ rowValues = rowCollector.rows.get(2);
+ assertEquals(Integer.valueOf(1), rowValues[0]);
+ assertEquals(Boolean.TRUE, rowValues[1]);
+ assertEquals(Double.valueOf(1.1d), rowValues[2]);
+ assertEquals("1", rowValues[3]);
+
+ c5Value = (List<?>) rowValues[4];
+ assertEquals(2, c5Value.size());
+ assertEquals(Integer.valueOf(1), c5Value.get(0));
+ assertEquals(Integer.valueOf(2), c5Value.get(1));
+
+ //c6Value = (Map<?,?>) rowValues[5];
+ //assertEquals(2, c6Value.size());
+ //assertEquals("x", c6Value.get(Integer.valueOf(1)));
+ //assertEquals("y", c6Value.get(Integer.valueOf(2)));
+
+ //c7Value = (Map<?,?>) rowValues[6];
+ //assertEquals(1, c7Value.size());
+ //assertEquals("v", c7Value.get("k"));
+
+ c8Value = (List<?>) rowValues[7];
+ assertEquals("a", c8Value.get(0));
+ assertEquals(Integer.valueOf(9), c8Value.get(1));
+ assertEquals(Double.valueOf(2.2d), c8Value.get(2));
+
+ assertEquals(Byte.valueOf((byte) 1), rowValues[8]);
+ assertEquals(Short.valueOf((short) 1), rowValues[9]);
+ assertEquals(Float.valueOf(1.0f), rowValues[10]);
+ assertEquals(Long.valueOf(1l), rowValues[11]);
+
+ c13Value = (List<?>) rowValues[12];
+ assertEquals(2, c13Value.size());
+ List<?> listVal = (List<?>) c13Value.get(0);
+ assertEquals("a", listVal.get(0));
+ assertEquals("b", listVal.get(1));
+ listVal = (List<?>) c13Value.get(1);
+ assertEquals("c", listVal.get(0));
+ assertEquals("d", listVal.get(1));
+
+ //c14Value = (Map<?,?>) rowValues[13];
+ //assertEquals(2, c14Value.size());
+ //Map<?,?> mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(1));
+ //assertEquals(2, mapVal.size());
+ //assertEquals(Integer.valueOf(12), mapVal.get(Integer.valueOf(11)));
+ //assertEquals(Integer.valueOf(14), mapVal.get(Integer.valueOf(13)));
+ //mapVal = (Map<?,?>) c14Value.get(Integer.valueOf(2));
+ //assertEquals(1, mapVal.size());
+ //assertEquals(Integer.valueOf(22), mapVal.get(Integer.valueOf(21)));
+
+ c15Value = (List<?>) rowValues[14];
+ assertEquals(Integer.valueOf(1), c15Value.get(0));
+ listVal = (List<?>) c15Value.get(1);
+ assertEquals(2, listVal.size());
+ assertEquals(Integer.valueOf(2), listVal.get(0));
+ assertEquals("x", listVal.get(1));
+
+ //c16Value = (List<?>) rowValues[15];
+ //assertEquals(2, c16Value.size());
+ //listVal = (List<?>) c16Value.get(0);
+ //assertEquals(2, listVal.size());
+ //mapVal = (Map<?,?>) listVal.get(0);
+ //assertEquals(0, mapVal.size());
+ //assertEquals(Integer.valueOf(1), listVal.get(1));
+ //listVal = (List<?>) c16Value.get(1);
+ //mapVal = (Map<?,?>) listVal.get(0);
+ //assertEquals(2, mapVal.size());
+ //assertEquals("b", mapVal.get("a"));
+ //assertEquals("d", mapVal.get("c"));
+ //assertEquals(Integer.valueOf(2), listVal.get(1));
+
+ assertEquals(Timestamp.valueOf("2012-04-22 09:00:00.123456789"), rowValues[16]);
+ assertEquals(new BigDecimal("123456789.123456"), rowValues[17]);
+ assertArrayEquals("abcd".getBytes("UTF-8"), (byte[]) rowValues[18]);
+ assertEquals(Date.valueOf("2013-01-01"), rowValues[19]);
+ assertEquals("abc123", rowValues[20]);
+ assertEquals("abc123 ", rowValues[21]);
+ assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
new file mode 100644
index 0000000..809068f
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import org.apache.hadoop.hive.llap.Row;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.hive.llap.LlapRowInputFormat;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.After;
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * TestJdbcWithMiniLlap for llap Row format.
+ */
+public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap {
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ BaseJdbcWithMiniLlap.beforeTest(false);
+ }
+
+ @Override
+ protected InputFormat<NullWritable, Row> getInputFormat() {
+ return new LlapRowInputFormat();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index a9ed3d2..5316aa7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -22,25 +22,15 @@ import com.google.common.base.Preconditions;
import java.io.BufferedInputStream;
import java.io.Closeable;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.DataInputStream;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.hive.llap.io.ChunkedInputStream;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,52 +139,61 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
throw new IOException("Hit end of input, but did not find expected end of data indicator");
}
- // There should be a reader event available, or coming soon, so okay to be blocking call.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case DONE:
- break;
- default:
- throw new IOException("Expected reader event with done status, but got "
- + event.getEventType() + " with message " + event.getMessage());
- }
+ processReaderEvent();
return false;
}
} catch (IOException io) {
- try {
- if (Thread.interrupted()) {
- // Either we were interrupted by one of:
- // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
- // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
- // Either way we should not try to block trying to read the reader events queue.
- if (readerEvents.isEmpty()) {
- // Case 2.
- throw io;
- } else {
- // Case 1. Fail the reader, sending back the error we received from the reader event.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case ERROR:
- throw new IOException("Received reader event error: " + event.getMessage(), io);
- default:
- throw new IOException("Got reader event type " + event.getEventType()
- + ", expected error event", io);
- }
- }
- } else {
- // If we weren't interrupted, just propagate the error
+ failOnInterruption(io);
+ return false;
+ }
+ }
+
+ protected void processReaderEvent() throws IOException {
+ // There should be a reader event available, or coming soon, so okay to be blocking call.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case DONE:
+ break;
+ default:
+ throw new IOException("Expected reader event with done status, but got "
+ + event.getEventType() + " with message " + event.getMessage());
+ }
+ }
+
+ protected void failOnInterruption(IOException io) throws IOException {
+ try {
+ if (Thread.interrupted()) {
+ // Either we were interrupted by one of:
+ // 1. handleEvent(), in which case there is a reader (error) event waiting for us in the queue
+ // 2. Some other unrelated cause which interrupted us, in which case there may not be a reader event coming.
+ // Either way we should not try to block trying to read the reader events queue.
+ if (readerEvents.isEmpty()) {
+ // Case 2.
throw io;
+ } else {
+ // Case 1. Fail the reader, sending back the error we received from the reader event.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case ERROR:
+ throw new IOException("Received reader event error: " + event.getMessage(), io);
+ default:
+ throw new IOException("Got reader event type " + event.getEventType()
+ + ", expected error event", io);
+ }
}
- } finally {
- // The external client handling umbilical responses and the connection to read the incoming
- // data are not coupled. Calling close() here to make sure an error in one will cause the
- // other to be closed as well.
- try {
- close();
- } catch (Exception err) {
- // Don't propagate errors from close() since this will lose the original error above.
- LOG.error("Closing RecordReader due to error and hit another error during close()", err);
- }
+ } else {
+ // If we weren't interrupted, just propagate the error
+ throw io;
+ }
+ } finally {
+ // The external client handling umbilical responses and the connection to read the incoming
+ // data are not coupled. Calling close() here to make sure an error in one will cause the
+ // other to be closed as well.
+ try {
+ close();
+ } catch (Exception err) {
+ // Don't propagate errors from close() since this will lose the original error above.
+ LOG.error("Closing RecordReader due to error and hit another error during close()", err);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
index 1cfbf3a..6cc1d17 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -70,20 +69,20 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class);
protected final Configuration conf;
- protected final RecordReader<NullWritable, BytesWritable> reader;
+ protected final RecordReader reader;
protected final Schema schema;
protected final AbstractSerDe serde;
- protected final BytesWritable data;
+ protected final Writable data;
public LlapRowRecordReader(Configuration conf, Schema schema,
- RecordReader<NullWritable, BytesWritable> reader) throws IOException {
+ RecordReader<NullWritable, ? extends Writable> reader) throws IOException {
this.conf = conf;
this.schema = schema;
this.reader = reader;
- this.data = new BytesWritable();
+ this.data = reader.createValue();
try {
- serde = initSerDe(conf);
+ this.serde = initSerDe(conf);
} catch (SerDeException err) {
throw new IOException(err);
}
@@ -118,7 +117,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
public boolean next(NullWritable key, Row value) throws IOException {
Preconditions.checkArgument(value != null);
- boolean hasNext = reader.next(key, data);
+ boolean hasNext = reader.next(key, data);
if (hasNext) {
// Deserialize data to column values, and populate the row record
Object rowObj;
@@ -216,7 +215,7 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
return convertedVal;
}
- static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) {
+ protected static void setRowFromStruct(Row row, Object structVal, StructObjectInspector soi) {
Schema structSchema = row.getSchema();
// Add struct field data to the Row
List<? extends StructField> structFields = soi.getAllStructFieldRefs();
@@ -230,6 +229,11 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
}
}
+ //Factory method for serDe
+ protected AbstractSerDe createSerDe() throws SerDeException {
+ return new LazyBinarySerDe();
+ }
+
protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException {
Properties props = new Properties();
StringBuilder columnsBuffer = new StringBuilder();
@@ -249,9 +253,9 @@ public class LlapRowRecordReader implements RecordReader<NullWritable, Row> {
props.put(serdeConstants.LIST_COLUMNS, columns);
props.put(serdeConstants.LIST_COLUMN_TYPES, types);
props.put(serdeConstants.ESCAPE_CHAR, "\\");
- AbstractSerDe serde = new LazyBinarySerDe();
- serde.initialize(conf, props);
+ AbstractSerDe createdSerDe = createSerDe();
+ createdSerDe.initialize(conf, props);
- return serde;
+ return createdSerDe;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml
index ed4704b..295d3e6 100644
--- a/llap-ext-client/pom.xml
+++ b/llap-ext-client/pom.xml
@@ -41,6 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
new file mode 100644
index 0000000..d9c5666
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+/*
+ * Read from Arrow stream batch-by-batch
+ */
+public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrapperWritable> {
+
+ private BufferAllocator allocator;
+ private ArrowStreamReader arrowStreamReader;
+
+ public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz,
+ JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException {
+ super(in, schema, clazz, job, client, socket);
+ allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+ this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator);
+ }
+
+ @Override
+ public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException {
+ try {
+ // Need a way to know what thread to interrupt, since this is a blocking thread.
+ setReaderThread(Thread.currentThread());
+
+ boolean hasInput = arrowStreamReader.loadNextBatch();
+ if (hasInput) {
+ VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
+ //There must be at least one column vector
+ Preconditions.checkState(vectorSchemaRoot.getFieldVectors().size() > 0);
+ if(vectorSchemaRoot.getFieldVectors().get(0).getValueCount() == 0) {
+ //An empty batch will appear at the end of the stream
+ return false;
+ }
+ value.setVectorSchemaRoot(arrowStreamReader.getVectorSchemaRoot());
+ return true;
+ } else {
+ processReaderEvent();
+ return false;
+ }
+ } catch (IOException io) {
+ failOnInterruption(io);
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ arrowStreamReader.close();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
new file mode 100644
index 0000000..fafbdee
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+
+/*
+ * Adapts an Arrow batch reader to a row reader
+ */
+public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> {
+
+ private LlapBaseInputFormat baseInputFormat;
+
+ public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
+ baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return baseInputFormat.getSplits(job, numSplits);
+ }
+
+ @Override
+ public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
+ LlapArrowBatchRecordReader reader =
+ (LlapArrowBatchRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter);
+ return new LlapArrowRowRecordReader(job, reader.getSchema(), reader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
new file mode 100644
index 0000000..d4179d5
--- /dev/null
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowRecordReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Buffers a batch for reading one row at a time.
+ */
+public class LlapArrowRowRecordReader extends LlapRowRecordReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapArrowRowRecordReader.class);
+ private int rowIndex = 0;
+ private int batchSize = 0;
+
+ //Buffer one batch at a time, for row retrieval
+ private Object[][] currentBatch;
+
+ public LlapArrowRowRecordReader(Configuration conf, Schema schema,
+ RecordReader<NullWritable, ? extends Writable> reader) throws IOException {
+ super(conf, schema, reader);
+ }
+
+ @Override
+ public boolean next(NullWritable key, Row value) throws IOException {
+ Preconditions.checkArgument(value != null);
+ boolean hasNext = false;
+ ArrowWrapperWritable batchData = (ArrowWrapperWritable) data;
+ if((batchSize == 0) || (rowIndex == batchSize)) {
+ //This is either the first batch or we've used up the current batch buffer
+ batchSize = 0;
+ rowIndex = 0;
+ hasNext = reader.next(key, data);
+ if(hasNext) {
+ //There is another batch to buffer
+ try {
+ List<FieldVector> vectors = batchData.getVectorSchemaRoot().getFieldVectors();
+ //hasNext implies there is some column in the batch
+ Preconditions.checkState(vectors.size() > 0);
+ //All the vectors have the same length,
+ //we can get the number of rows from the first vector
+ batchSize = vectors.get(0).getValueCount();
+ ArrowWrapperWritable wrapper = new ArrowWrapperWritable(batchData.getVectorSchemaRoot());
+ currentBatch = (Object[][]) serde.deserialize(wrapper);
+ StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
+ setRowFromStruct(value, currentBatch[rowIndex], rowOI);
+ } catch (Exception e) {
+ LOG.error("Failed to fetch Arrow batch", e);
+ throw new RuntimeException(e);
+ }
+ }
+ //There were no more batches AND
+ //this is either the first batch or we've used up the current batch buffer.
+ //goto return false
+ } else if(rowIndex < batchSize) {
+ //Take a row from the current buffered batch
+ hasNext = true;
+ StructObjectInspector rowOI = null;
+ try {
+ rowOI = (StructObjectInspector) serde.getObjectInspector();
+ } catch (SerDeException e) {
+ throw new RuntimeException(e);
+ }
+ setRowFromStruct(value, currentBatch[rowIndex], rowOI);
+ }
+ //Always inc the batch buffer index
+ //If we return false, it is just a noop
+ rowIndex++;
+ return hasNext;
+ }
+
+ protected AbstractSerDe createSerDe() throws SerDeException {
+ return new ArrowColumnarBatchSerDe();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index f4c7fa4..ef03be6 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -49,15 +49,15 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -104,6 +104,8 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private String user; // "hive",
private String pwd; // ""
private String query;
+ private boolean useArrow;
+ private long arrowAllocatorLimit;
private final Random rand = new Random();
public static final String URL_KEY = "llap.if.hs2.connection";
@@ -123,7 +125,14 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
this.query = query;
}
- public LlapBaseInputFormat() {}
+ public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
+ this.useArrow = useArrow;
+ this.arrowAllocatorLimit = arrowAllocatorLimit;
+ }
+
+ public LlapBaseInputFormat() {
+ this.useArrow = false;
+ }
@SuppressWarnings("unchecked")
@@ -195,8 +204,16 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
LOG.info("Registered id: " + fragmentId);
@SuppressWarnings("rawtypes")
- LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(),
- llapSplit.getSchema(), Text.class, job, llapClient, (java.io.Closeable)socket);
+ LlapBaseRecordReader recordReader;
+ if(useArrow) {
+ recordReader = new LlapArrowBatchRecordReader(
+ socket.getInputStream(), llapSplit.getSchema(),
+ ArrowWrapperWritable.class, job, llapClient, socket,
+ arrowAllocatorLimit);
+ } else {
+ recordReader = new LlapBaseRecordReader(socket.getInputStream(),
+ llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);
+ }
umbilicalResponder.setRecordReader(recordReader);
return recordReader;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a963f8d..77074df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,6 +119,7 @@
<antlr.version>3.5.2</antlr.version>
<apache-directory-server.version>1.5.6</apache-directory-server.version>
<apache-directory-clientapi.version>0.1</apache-directory-clientapi.version>
+ <!-- Include arrow for LlapOutputFormatService -->
<arrow.version>0.8.0</arrow.version>
<avatica.version>1.11.0</avatica.version>
<avro.version>1.7.7</avro.version>
[5/6] hive git commit: HIVE-19308: Provide an Arrow stream reader for
external LLAP clients (Eric Wohlstadter, reviewed by Jason Dere)
Posted by vg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
index df7b53f..dd490b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -15,26 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql.io.arrow;
import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-public class ArrowWrapperWritable implements Writable {
+public class ArrowWrapperWritable implements WritableComparable {
private VectorSchemaRoot vectorSchemaRoot;
public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
this.vectorSchemaRoot = vectorSchemaRoot;
}
+ public ArrowWrapperWritable() {}
public VectorSchemaRoot getVectorSchemaRoot() {
return vectorSchemaRoot;
}
+ public void setVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) {
+ this.vectorSchemaRoot = vectorSchemaRoot;
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
throw new UnsupportedOperationException();
@@ -44,4 +50,12 @@ public class ArrowWrapperWritable implements Writable {
public void readFields(DataInput dataInput) throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override public int compareTo(Object o) {
+ return 0;
+ }
+
+ @Override public boolean equals(Object o) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
index 78cc188..7aa732b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/RootAllocatorFactory.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql.io.arrow;
import org.apache.arrow.memory.RootAllocator;
@@ -41,4 +42,12 @@ public enum RootAllocatorFactory {
}
return rootAllocator;
}
+
+ //arrowAllocatorLimit is ignored if an allocator was previously created
+ public synchronized RootAllocator getOrCreateRootAllocator(long arrowAllocatorLimit) {
+ if (rootAllocator == null) {
+ rootAllocator = new RootAllocator(arrowAllocatorLimit);
+ }
+ return rootAllocator;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2334a0dd/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 13a3070..f27cdf4 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -54,6 +54,7 @@ public class TestLlapOutputFormat {
Configuration conf = new Configuration();
// Pick random avail port
HiveConf.setIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
LlapOutputFormatService.initializeAndStart(conf, null);
service = LlapOutputFormatService.get();
LlapProxy.setDaemon(true);
[4/6] hive git commit: HIVE-19307: Support ArrowOutputStream in
LlapOutputFormatService (Eric Wohlstadter, reviewed by Jason Dere)
Posted by vg...@apache.org.
HIVE-19307: Support ArrowOutputStream in LlapOutputFormatService (Eric Wohlstadter, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f7f90a04
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f7f90a04
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f7f90a04
Branch: refs/heads/branch-3
Commit: f7f90a044499739a2bd6a3ea543f70cb59e3f870
Parents: 2726f30
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue May 15 14:25:40 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 29 13:58:16 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hadoop/hive/llap/LlapArrowRecordWriter.java | 70 +++++++++++
.../hive/llap/LlapOutputFormatService.java | 11 +-
.../hive/llap/WritableByteChannelAdapter.java | 125 +++++++++++++++++++
.../hadoop/hive/ql/exec/FileSinkOperator.java | 26 ++--
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 28 +++--
.../hadoop/hive/ql/plan/FileSinkDesc.java | 12 +-
7 files changed, 251 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 128e892..8780374 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -397,6 +397,7 @@ public class HiveConf extends Configuration {
llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname);
+ llapDaemonVarsSetLocal.add(ConfVars.LLAP_OUTPUT_FORMAT_ARROW.varname);
}
/**
@@ -4160,6 +4161,8 @@ public class HiveConf extends Configuration {
Constants.LLAP_LOGGER_NAME_RFA,
Constants.LLAP_LOGGER_NAME_CONSOLE),
"logger used for llap-daemons."),
+ LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false,
+ "Whether LLapOutputFormatService should output arrow batches"),
HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms",
new TimeValidator(TimeUnit.MILLISECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
new file mode 100644
index 0000000..1b3a3eb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.io.Writable;
+import java.nio.channels.WritableByteChannel;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes Arrow batches to an {@link org.apache.arrow.vector.ipc.ArrowStreamWriter}.
+ * The byte stream will be formatted according to the Arrow Streaming format.
+ * Because ArrowStreamWriter is bound to a {@link org.apache.arrow.vector.VectorSchemaRoot}
+ * when it is created,
+ * calls to the {@link #write(Writable, Writable)} method only serve as a signal that
+ * a new batch has been loaded to the associated VectorSchemaRoot.
+ * Payload data for writing is indirectly made available by reference:
+ * ArrowStreamWriter -> VectorSchemaRoot -> List<FieldVector>
+ * i.e. both they key and value are ignored once a reference to the VectorSchemaRoot
+ * is obtained.
+ */
+public class LlapArrowRecordWriter<K extends Writable, V extends Writable>
+ implements RecordWriter<K, V> {
+ public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class);
+
+ ArrowStreamWriter arrowStreamWriter;
+ WritableByteChannel out;
+
+ public LlapArrowRecordWriter(WritableByteChannel out) {
+ this.out = out;
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ arrowStreamWriter.close();
+ }
+
+ @Override
+ public void write(K key, V value) throws IOException {
+ ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) value;
+ if (arrowStreamWriter == null) {
+ VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot();
+ arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out);
+ }
+ arrowStreamWriter.writeBatch();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 30d5eb5..c71c637 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -198,11 +198,16 @@ public class LlapOutputFormatService {
LOG.debug("registering socket for: " + id);
int maxPendingWrites = HiveConf.getIntVar(conf,
HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
+ boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
@SuppressWarnings("rawtypes")
- LlapRecordWriter writer = new LlapRecordWriter(id,
+ RecordWriter writer = null;
+ if(useArrow) {
+ writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id));
+ } else {
+ writer = new LlapRecordWriter(id,
new ChunkedOutputStream(
- new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites),
- sendBufferSize, id));
+ new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), sendBufferSize, id));
+ }
boolean isFailed = true;
synchronized (lock) {
if (!writers.containsKey(id)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
new file mode 100644
index 0000000..57da1d9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.Semaphore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+
+/**
+ * Provides an adapter between {@link java.nio.channels.WritableByteChannel}
+ * and {@link io.netty.channel.ChannelHandlerContext}.
+ * Additionally provides a form of flow-control by limiting the number of
+ * queued async writes.
+ */
+public class WritableByteChannelAdapter implements WritableByteChannel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WritableByteChannelAdapter.class);
+ private ChannelHandlerContext chc;
+ private final int maxPendingWrites;
+ // This semaphore provides two functions:
+ // 1. Forces a cap on the number of outstanding async writes to channel
+ // 2. Ensures that channel isn't closed if there are any outstanding async writes
+ private final Semaphore writeResources;
+ private boolean closed = false;
+ private final String id;
+
+ private ChannelFutureListener writeListener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ //Asynch write completed
+ //Up the semaphore
+ writeResources.release();
+
+ if (future.isCancelled()) {
+ LOG.error("Write cancelled on ID " + id);
+ } else if (!future.isSuccess()) {
+ LOG.error("Write error on ID " + id, future.cause());
+ }
+ }
+ };
+
+ private ChannelFutureListener closeListener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isCancelled()) {
+ LOG.error("Close cancelled on ID " + id);
+ } else if (!future.isSuccess()) {
+ LOG.error("Close failed on ID " + id, future.cause());
+ }
+ }
+ };
+
+ public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) {
+ this.chc = chc;
+ this.maxPendingWrites = maxPendingWrites;
+ this.writeResources = new Semaphore(maxPendingWrites);
+ this.id = id;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int size = src.remaining();
+ //Down the semaphore or block until available
+ takeWriteResources(1);
+ chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener);
+ return size;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return chc.channel().isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ throw new IOException("Already closed: " + id);
+ }
+
+ closed = true;
+ //Block until all semaphore resources are released
+ //by outstanding async writes
+ takeWriteResources(maxPendingWrites);
+
+ try {
+ chc.close().addListener(closeListener);
+ } finally {
+ chc = null;
+ closed = true;
+ }
+ }
+
+ private void takeWriteResources(int numResources) throws IOException {
+ try {
+ writeResources.acquire(numResources);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted while waiting for write resources for " + id);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 01a5b4c..9c57eff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.HivePartitioner;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -1251,16 +1252,25 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
// If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size)
// and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full
// (the size of buffer is kept track of in the ThriftJDBCBinarySerDe).
- if (conf.isUsingThriftJDBCBinarySerDe()) {
- try {
- recordValue = serializer.serialize(null, inputObjInspectors[0]);
- if ( null != fpaths ) {
- rowOutWriters = fpaths.outWriters;
- rowOutWriters[0].write(recordValue);
+ if (conf.isUsingBatchingSerDe()) {
+ try {
+ recordValue = serializer.serialize(null, inputObjInspectors[0]);
+ if (null != fpaths) {
+ rowOutWriters = fpaths.outWriters;
+ rowOutWriters[0].write(recordValue);
+ } else if(recordValue instanceof ArrowWrapperWritable) {
+ //Because LLAP arrow output depends on the ThriftJDBCBinarySerDe code path
+ //this is required for 0 row outputs
+ //i.e. we need to write a 0 size batch to signal EOS to the consumer
+ for (FSPaths fsPaths : valToPaths.values()) {
+ for(RecordWriter writer : fsPaths.outWriters) {
+ writer.write(recordValue);
+ }
}
- } catch (SerDeException | IOException e) {
- throw new HiveException(e);
}
+ } catch (SerDeException | IOException e) {
+ throw new HiveException(e);
+ }
}
List<Path> commitPaths = new ArrayList<>();
for (FSPaths fsp : valToPaths.values()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index bc62f3c..863ab1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -7498,7 +7499,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) {
- serdeClass = LazyBinarySerDe2.class;
+ boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
+ if(useArrow) {
+ serdeClass = ArrowColumnarBatchSerDe.class;
+ } else {
+ serdeClass = LazyBinarySerDe2.class;
+ }
}
table_desc =
PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat,
@@ -7579,13 +7585,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ltd.setInsertOverwrite(true);
}
}
- if (SessionState.get().isHiveServerQuery() &&
- null != table_desc &&
- table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) &&
- HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
- fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true);
+ if (null != table_desc && useBatchingSerializer(table_desc.getSerdeClassName())) {
+ fileSinkDesc.setIsUsingBatchingSerDe(true);
} else {
- fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false);
+ fileSinkDesc.setIsUsingBatchingSerDe(false);
}
Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
@@ -7620,6 +7623,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return output;
}
+ private boolean useBatchingSerializer(String serdeClassName) {
+ return SessionState.get().isHiveServerQuery() &&
+ hasSetBatchSerializer(serdeClassName);
+ }
+
+ private boolean hasSetBatchSerializer(String serdeClassName) {
+ return (serdeClassName.equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) &&
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) ||
+ serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName());
+ }
+
private ColsAndTypes deriveFileSinkColTypes(
RowResolver inputRR, List<FieldSchema> field_schemas) throws SemanticException {
ColsAndTypes result = new ColsAndTypes("", "");
http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index fcb6de7..1d05468 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -103,9 +103,9 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
/**
* Whether is a HiveServer query, and the destination table is
- * indeed written using ThriftJDBCBinarySerDe
+ * indeed written using a row batching SerDe
*/
- private boolean isUsingThriftJDBCBinarySerDe = false;
+ private boolean isUsingBatchingSerDe = false;
private boolean isInsertOverwrite = false;
@@ -183,12 +183,12 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
this.isHiveServerQuery = isHiveServerQuery;
}
- public boolean isUsingThriftJDBCBinarySerDe() {
- return this.isUsingThriftJDBCBinarySerDe;
+ public boolean isUsingBatchingSerDe() {
+ return this.isUsingBatchingSerDe;
}
- public void setIsUsingThriftJDBCBinarySerDe(boolean isUsingThriftJDBCBinarySerDe) {
- this.isUsingThriftJDBCBinarySerDe = isUsingThriftJDBCBinarySerDe;
+ public void setIsUsingBatchingSerDe(boolean isUsingBatchingSerDe) {
+ this.isUsingBatchingSerDe = isUsingBatchingSerDe;
}
@Explain(displayName = "directory", explainLevels = { Level.EXTENDED })