You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/02/15 02:42:47 UTC
[3/3] hive git commit: HIVE-18553 : Support schema evolution in
Parquet Vectorization reader (Ferdinand Xu, reviewed by Vihang Karajgaonkar)
HIVE-18553 : Support schema evolution in Parquet Vectorization reader (Ferdinand Xu, reviewed by Vihang Karajgaonkar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7ddac02b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7ddac02b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7ddac02b
Branch: refs/heads/master
Commit: 7ddac02b8e0630c38e3c7b739c7d43e130988e05
Parents: 71d6e16
Author: Vihang Karajgaonkar <vi...@cloudera.com>
Authored: Wed Feb 14 18:16:47 2018 -0800
Committer: Vihang Karajgaonkar <vi...@cloudera.com>
Committed: Wed Feb 14 18:16:47 2018 -0800
----------------------------------------------------------------------
.../vector/BaseVectorizedColumnReader.java | 72 +-
.../parquet/vector/ParquetDataColumnReader.java | 170 ++++
.../vector/ParquetDataColumnReaderFactory.java | 908 +++++++++++++++++++
.../vector/VectorizedDummyColumnReader.java | 42 +
.../vector/VectorizedListColumnReader.java | 297 +++---
.../vector/VectorizedParquetRecordReader.java | 45 +-
.../vector/VectorizedPrimitiveColumnReader.java | 271 ++++--
.../hive/ql/io/parquet/vector/package-info.java | 22 +
.../io/parquet/TestVectorizedColumnReader.java | 16 +-
...ectorizedDictionaryEncodingColumnReader.java | 14 +
.../parquet/VectorizedColumnReaderTestBase.java | 696 ++++++++++----
...ema_evol_par_vec_table_dictionary_encoding.q | 94 ++
...evol_par_vec_table_non_dictionary_encoding.q | 94 ++
.../schema_evol_par_vec_table.q.out | 357 ++++++++
...evol_par_vec_table_dictionary_encoding.q.out | 522 +++++++++++
..._par_vec_table_non_dictionary_encoding.q.out | 522 +++++++++++
16 files changed, 3713 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
index 907a9b8..4a17ee4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
@@ -1,9 +1,13 @@
/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,10 +18,10 @@
package org.apache.hadoop.hive.ql.io.parquet.vector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
@@ -27,6 +31,7 @@ import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +67,7 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
/**
* The dictionary, if this column has dictionary encoding.
*/
- protected final Dictionary dictionary;
+ protected final ParquetDataColumnReader dictionary;
/**
* If true, the current page is dictionary encoded.
@@ -82,7 +87,7 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
*/
protected IntIterator repetitionLevelColumn;
protected IntIterator definitionLevelColumn;
- protected ValuesReader dataColumn;
+ protected ParquetDataColumnReader dataColumn;
/**
* Total values in the current page.
@@ -92,22 +97,39 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
protected final PageReader pageReader;
protected final ColumnDescriptor descriptor;
protected final Type type;
+ protected final TypeInfo hiveType;
+
+ /**
+ * Used for VectorizedDummyColumnReader.
+ */
+ public BaseVectorizedColumnReader(){
+ this.pageReader = null;
+ this.descriptor = null;
+ this.type = null;
+ this.dictionary = null;
+ this.hiveType = null;
+ this.maxDefLevel = -1;
+ }
public BaseVectorizedColumnReader(
ColumnDescriptor descriptor,
PageReader pageReader,
boolean skipTimestampConversion,
- Type type) throws IOException {
+ Type parquetType, TypeInfo hiveType) throws IOException {
this.descriptor = descriptor;
- this.type = type;
+ this.type = parquetType;
this.pageReader = pageReader;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
this.skipTimestampConversion = skipTimestampConversion;
+ this.hiveType = hiveType;
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.dictionary = ParquetDataColumnReaderFactory
+ .getDataColumnReaderByTypeOnDictionary(parquetType.asPrimitiveType(), hiveType,
+ dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage),
+ skipTimestampConversion);
this.isCurrentPageDictionaryEncoded = true;
} catch (IOException e) {
throw new IOException("could not decode the dictionary for " + descriptor, e);
@@ -130,7 +152,7 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
if (page == null) {
return;
}
- // TODO: Why is this a visitor?
+
page.accept(new DataPage.Visitor<Void>() {
@Override
public Void visit(DataPageV1 dataPageV1) {
@@ -146,7 +168,8 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
});
}
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount)
+ throws IOException {
this.pageValueCount = valueCount;
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
@@ -156,10 +179,13 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
"could not read page in col " + descriptor +
" as the dictionary was missing for encoding " + dataEncoding);
}
- dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
+ dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType,
+ dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary
+ .getDictionary()), skipTimestampConversion);
this.isCurrentPageDictionaryEncoded = true;
} else {
- dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+ dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType,
+ dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion);
this.isCurrentPageDictionaryEncoded = false;
}
@@ -219,8 +245,20 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
}
/**
+ * Check the underlying Parquet file is able to parse as Hive Decimal type.
+ *
+ * @param type
+ */
+ protected void decimalTypeCheck(Type type) {
+ DecimalMetadata decimalMetadata = type.asPrimitiveType().getDecimalMetadata();
+ if (decimalMetadata == null) {
+ throw new UnsupportedOperationException("The underlying Parquet type cannot be able to " +
+ "converted to Hive Decimal type: " + type);
+ }
+ }
+
+ /**
* Utility classes to abstract over different way to read ints with different encodings.
- * TODO: remove this layer of abstraction?
*/
abstract static class IntIterator {
abstract int nextInt();
@@ -258,6 +296,8 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
protected static final class NullIntIterator extends IntIterator {
@Override
- int nextInt() { return 0; }
+ int nextInt() {
+ return 0;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java
new file mode 100644
index 0000000..6bfa95a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.parquet.column.Dictionary;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * The interface to wrap the underlying Parquet dictionary and non dictionary encoded page reader.
+ */
+public interface ParquetDataColumnReader {
+
+ /**
+ * Initialize the reader by page data.
+ * @param valueCount value count
+ * @param page page data
+ * @param offset current offset
+ * @throws IOException
+ */
+ void initFromPage(int valueCount, byte[] page, int offset) throws IOException;
+
+ /**
+ * @return the next Dictionary ID from the page
+ */
+ int readValueDictionaryId();
+
+ /**
+ * @return the next Long from the page
+ */
+ long readLong();
+
+ /**
+ * @return the next Integer from the page
+ */
+ int readInteger();
+
+ /**
+ * @return the next Float from the page
+ */
+ float readFloat();
+
+ /**
+ * @return the next Boolean from the page
+ */
+ boolean readBoolean();
+
+ /**
+ * @return the next String from the page
+ */
+ byte[] readString();
+
+ /**
+ * @return the next Varchar from the page
+ */
+ byte[] readVarchar();
+
+ /**
+ * @return the next Char from the page
+ */
+ byte[] readChar();
+
+ /**
+ * @return the next Bytes from the page
+ */
+ byte[] readBytes();
+
+ /**
+ * @return the next Decimal from the page
+ */
+ byte[] readDecimal();
+
+ /**
+ * @return the next Double from the page
+ */
+ double readDouble();
+
+ /**
+ * @return the next Timestamp from the page
+ */
+ Timestamp readTimestamp();
+
+ /**
+ * @return the underlying dictionary if current reader is dictionary encoded
+ */
+ Dictionary getDictionary();
+
+ /**
+ * @param id in dictionary
+ * @return the Bytes from the dictionary by id
+ */
+ byte[] readBytes(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Float from the dictionary by id
+ */
+ float readFloat(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Double from the dictionary by id
+ */
+ double readDouble(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Integer from the dictionary by id
+ */
+ int readInteger(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Long from the dictionary by id
+ */
+ long readLong(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Boolean from the dictionary by id
+ */
+ boolean readBoolean(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Decimal from the dictionary by id
+ */
+ byte[] readDecimal(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Timestamp from the dictionary by id
+ */
+ Timestamp readTimestamp(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the String from the dictionary by id
+ */
+ byte[] readString(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Varchar from the dictionary by id
+ */
+ byte[] readVarchar(int id);
+
+ /**
+ * @param id in dictionary
+ * @return the Char from the dictionary by id
+ */
+ byte[] readChar(int id);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
new file mode 100644
index 0000000..898a2c6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.common.type.HiveBaseChar;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+/**
+ * Parquet file has self-describing schema which may differ from the user required schema (e.g.
+ * schema evolution). This factory is used to retrieve user required typed data via corresponding
+ * reader which reads the underlying data.
+ */
+public final class ParquetDataColumnReaderFactory {
+
+ private ParquetDataColumnReaderFactory() {
+ }
+
+ /**
+ * The default data column reader for existing Parquet page reader which works for both
+ * dictionary or non dictionary types, Mirror from dictionary encoding path.
+ */
+ public static class DefaultParquetDataColumnReader implements ParquetDataColumnReader {
+ protected ValuesReader valuesReader;
+ protected Dictionary dict;
+
+ // Varchar or char length
+ protected int length = -1;
+
+ public DefaultParquetDataColumnReader(ValuesReader valuesReader, int length) {
+ this.valuesReader = valuesReader;
+ this.length = length;
+ }
+
+ public DefaultParquetDataColumnReader(Dictionary dict, int length) {
+ this.dict = dict;
+ this.length = length;
+ }
+
+ public void initFromPage(int i, ByteBuffer byteBuffer, int i1) throws IOException {
+ valuesReader.initFromPage(i, byteBuffer, i1);
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
+ this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return valuesReader.readBoolean();
+ }
+
+ @Override
+ public boolean readBoolean(int id) {
+ return dict.decodeToBoolean(id);
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readString() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ // we need to enforce the size here even the types are the same
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readChar() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readBytes() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readBytes(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readDecimal() {
+ return valuesReader.readBytes().getBytesUnsafe();
+ }
+
+ @Override
+ public byte[] readDecimal(int id) {
+ return dict.decodeToBinary(id).getBytesUnsafe();
+ }
+
+ @Override
+ public float readFloat() {
+ return valuesReader.readFloat();
+ }
+
+ @Override
+ public float readFloat(int id) {
+ return dict.decodeToFloat(id);
+ }
+
+ @Override
+ public double readDouble() {
+ return valuesReader.readDouble();
+ }
+
+ @Override
+ public double readDouble(int id) {
+ return dict.decodeToDouble(id);
+ }
+
+ @Override
+ public Timestamp readTimestamp() {
+ throw new RuntimeException("Unsupported operation");
+ }
+
+ @Override
+ public Timestamp readTimestamp(int id) {
+ throw new RuntimeException("Unsupported operation");
+ }
+
+ @Override
+ public int readInteger() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public int readInteger(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public long readLong(int id) {
+ return dict.decodeToLong(id);
+ }
+
+ @Override
+ public long readLong() {
+ return valuesReader.readLong();
+ }
+
+ @Override
+ public int readValueDictionaryId() {
+ return valuesReader.readValueDictionaryId();
+ }
+
+ public void skip() {
+ valuesReader.skip();
+ }
+
+ @Override
+ public Dictionary getDictionary() {
+ return dict;
+ }
+
+ /**
+ * Enforce the max legnth of varchar or char.
+ */
+ protected String enforceMaxLength(String value) {
+ return HiveBaseChar.enforceMaxLength(value, length);
+ }
+
+ /**
+ * Enforce the char length.
+ */
+ protected String getPaddedString(String value) {
+ return HiveBaseChar.getPaddedValue(value, length);
+ }
+
+ /**
+ * Method to convert string to UTF-8 bytes.
+ */
+ protected static byte[] convertToBytes(String value) {
+ try {
+ // convert integer to string
+ return value.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Failed to encode string in UTF-8", e);
+ }
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying int32 value value. Implementation is in consist with
+ * ETypeConverter EINT32_CONVERTER
+ */
+ public static class TypesFromInt32PageReader extends DefaultParquetDataColumnReader {
+
+ public TypesFromInt32PageReader(ValuesReader realReader, int length) {
+ super(realReader, length);
+ }
+
+ public TypesFromInt32PageReader(Dictionary dict, int length) {
+ super(dict, length);
+ }
+
+ @Override
+ public long readLong() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public long readLong(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public float readFloat() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public float readFloat(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public double readDouble() {
+ return valuesReader.readInteger();
+ }
+
+ @Override
+ public double readDouble(int id) {
+ return dict.decodeToInt(id);
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(valuesReader.readInteger());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(dict.decodeToInt(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readInteger()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToInt(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readInteger()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToInt(id)));
+ return convertToBytes(value);
+ }
+
+ private static String convertToString(int value) {
+ return Integer.toString(value);
+ }
+
+ private static byte[] convertToBytes(int value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying int64 value value. Implementation is in consist with
+ * ETypeConverter EINT64_CONVERTER
+ */
+ public static class TypesFromInt64PageReader extends DefaultParquetDataColumnReader {
+
+ public TypesFromInt64PageReader(ValuesReader realReader, int length) {
+ super(realReader, length);
+ }
+
+ public TypesFromInt64PageReader(Dictionary dict, int length) {
+ super(dict, length);
+ }
+
+ @Override
+ public float readFloat() {
+ return valuesReader.readLong();
+ }
+
+ @Override
+ public float readFloat(int id) {
+ return dict.decodeToLong(id);
+ }
+
+ @Override
+ public double readDouble() {
+ return valuesReader.readLong();
+ }
+
+ @Override
+ public double readDouble(int id) {
+ return dict.decodeToLong(id);
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(valuesReader.readLong());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(dict.decodeToLong(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readLong()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToLong(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readLong()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToLong(id)));
+ return convertToBytes(value);
+ }
+
+ private static String convertToString(long value) {
+ return Long.toString(value);
+ }
+
+ private static byte[] convertToBytes(long value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying float value value. Implementation is in consist with
+ * ETypeConverter EFLOAT_CONVERTER
+ */
+ public static class TypesFromFloatPageReader extends DefaultParquetDataColumnReader {
+
+ public TypesFromFloatPageReader(ValuesReader realReader, int length) {
+ super(realReader, length);
+ }
+
+ public TypesFromFloatPageReader(Dictionary realReader, int length) {
+ super(realReader, length);
+ }
+
+ @Override
+ public double readDouble() {
+ return valuesReader.readFloat();
+ }
+
+ @Override
+ public double readDouble(int id) {
+ return dict.decodeToFloat(id);
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(valuesReader.readFloat());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(dict.decodeToFloat(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readFloat()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToFloat(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readFloat()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToFloat(id)));
+ return convertToBytes(value);
+ }
+
+ private static String convertToString(float value) {
+ return Float.toString(value);
+ }
+
+ private static byte[] convertToBytes(float value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying double value value.
+ */
+ public static class TypesFromDoublePageReader extends DefaultParquetDataColumnReader {
+
+ public TypesFromDoublePageReader(ValuesReader realReader, int length) {
+ super(realReader, length);
+ }
+
+ public TypesFromDoublePageReader(Dictionary dict, int length) {
+ super(dict, length);
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(valuesReader.readDouble());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(dict.decodeToDouble(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readDouble()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToDouble(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readDouble()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToDouble(id)));
+ return convertToBytes(value);
+ }
+
+ private static String convertToString(double value) {
+ return Double.toString(value);
+ }
+
+ private static byte[] convertToBytes(double value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying boolean value value.
+ */
+ public static class TypesFromBooleanPageReader extends DefaultParquetDataColumnReader {
+
+ public TypesFromBooleanPageReader(ValuesReader valuesReader, int length) {
+ super(valuesReader, length);
+ }
+
+ public TypesFromBooleanPageReader(Dictionary dict, int length) {
+ super(dict, length);
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(valuesReader.readBoolean());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(dict.decodeToBoolean(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readBoolean()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToBoolean(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readBoolean()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToBoolean(id)));
+ return convertToBytes(value);
+ }
+
+ private static String convertToString(boolean value) {
+ return Boolean.toString(value);
+ }
+
+ private static byte[] convertToBytes(boolean value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying Timestamp value value.
+ */
+ public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader {
+ private boolean skipTimestampConversion = false;
+
+ public TypesFromInt96PageReader(ValuesReader realReader, int length,
+ boolean skipTimestampConversion) {
+ super(realReader, length);
+ this.skipTimestampConversion = skipTimestampConversion;
+ }
+
+ public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion) {
+ super(dict, length);
+ this.skipTimestampConversion = skipTimestampConversion;
+ }
+
+ private Timestamp convert(Binary binary) {
+ ByteBuffer buf = binary.toByteBuffer();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ long timeOfDayNanos = buf.getLong();
+ int julianDay = buf.getInt();
+ NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
+ return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
+ }
+
+ @Override
+ public Timestamp readTimestamp(int id) {
+ return convert(dict.decodeToBinary(id));
+ }
+
+ @Override
+ public Timestamp readTimestamp() {
+ return convert(valuesReader.readBytes());
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(readTimestamp());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(readTimestamp(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(readTimestamp()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(readTimestamp(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(readTimestamp()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(readTimestamp(id)));
+ return convertToBytes(value);
+ }
+
+ private static String convertToString(Timestamp value) {
+ return value.toString();
+ }
+
+ private static byte[] convertToBytes(Timestamp value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying decimal value value.
+ */
+ public static class TypesFromDecimalPageReader extends DefaultParquetDataColumnReader {
+ private HiveDecimalWritable tempDecimal = new HiveDecimalWritable();
+ private short scale;
+
+ public TypesFromDecimalPageReader(ValuesReader realReader, int length, short scale) {
+ super(realReader, length);
+ this.scale = scale;
+ }
+
+ public TypesFromDecimalPageReader(Dictionary dict, int length, short scale) {
+ super(dict, length);
+ this.scale = scale;
+ }
+
+ @Override
+ public byte[] readString() {
+ return convertToBytes(valuesReader.readBytes());
+ }
+
+ @Override
+ public byte[] readString(int id) {
+ return convertToBytes(dict.decodeToBinary(id));
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readBytes()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToBinary(id)));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ String value = enforceMaxLength(
+ convertToString(valuesReader.readBytes()));
+ return convertToBytes(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ String value = enforceMaxLength(
+ convertToString(dict.decodeToBinary(id)));
+ return convertToBytes(value);
+ }
+
+ private String convertToString(Binary value) {
+ tempDecimal.set(value.getBytesUnsafe(), scale);
+ return tempDecimal.toString();
+ }
+
+ private byte[] convertToBytes(Binary value) {
+ return convertToBytes(convertToString(value));
+ }
+ }
+
+ /**
+ * The reader who reads from the underlying UTF8 string.
+ */
+ public static class TypesFromStringPageReader extends DefaultParquetDataColumnReader {
+
+ public TypesFromStringPageReader(ValuesReader realReader, int length) {
+ super(realReader, length);
+ }
+
+ public TypesFromStringPageReader(Dictionary dict, int length) {
+ super(dict, length);
+ }
+
+ @Override
+ public byte[] readVarchar() {
+ // check the character numbers with the length
+ final byte[] value = valuesReader.readBytes().getBytesUnsafe();
+ return truncateIfNecesssary(value);
+ }
+
+ @Override
+ public byte[] readVarchar(int id) {
+ // check the character numbers with the length
+ final byte[] value = dict.decodeToBinary(id).getBytesUnsafe();
+ return truncateIfNecesssary(value);
+ }
+
+ @Override
+ public byte[] readChar() {
+ // check the character numbers with the length
+ final byte[] value = valuesReader.readBytes().getBytesUnsafe();
+ return truncateIfNecesssary(value);
+ }
+
+ @Override
+ public byte[] readChar(int id) {
+ // check the character numbers with the length
+ final byte[] value = dict.decodeToBinary(id).getBytesUnsafe();
+ return truncateIfNecesssary(value);
+ }
+
+ private byte[] truncateIfNecesssary(byte[] bytes) {
+ if (length <= 0 || bytes == null) {
+ return bytes;
+ }
+
+ int len = bytes.length;
+ int truncatedLength = StringExpr.truncate(bytes, 0, len, length);
+ if (truncatedLength >= len) {
+ return bytes;
+ }
+
+ return Arrays.copyOf(bytes, truncatedLength);
+ }
+ }
+
+ private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean isDictionary,
+ PrimitiveType parquetType,
+ TypeInfo hiveType,
+ Dictionary dictionary,
+ ValuesReader valuesReader,
+ boolean
+ skipTimestampConversion)
+ throws IOException {
+ // max length for varchar and char cases
+ int length = getVarcharLength(hiveType);
+
+ switch (parquetType.getPrimitiveTypeName()) {
+ case INT32:
+ return isDictionary ? new TypesFromInt32PageReader(dictionary, length) : new
+ TypesFromInt32PageReader(valuesReader, length);
+ case INT64:
+ return isDictionary ? new TypesFromInt64PageReader(dictionary, length) : new
+ TypesFromInt64PageReader(valuesReader, length);
+ case FLOAT:
+ return isDictionary ? new TypesFromFloatPageReader(dictionary, length) : new
+ TypesFromFloatPageReader(valuesReader, length);
+ case INT96:
+ return isDictionary ? new TypesFromInt96PageReader(dictionary, length,
+ skipTimestampConversion) : new
+ TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion);
+ case BOOLEAN:
+ return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new
+ TypesFromBooleanPageReader(valuesReader, length);
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return getConvertorFromBinary(isDictionary, parquetType, hiveType, valuesReader, dictionary);
+ case DOUBLE:
+ return isDictionary ? new TypesFromDoublePageReader(dictionary, length) : new
+ TypesFromDoublePageReader(valuesReader, length);
+ default:
+ return isDictionary ? new DefaultParquetDataColumnReader(dictionary, length) : new
+ DefaultParquetDataColumnReader(valuesReader, length);
+ }
+ }
+
+ private static ParquetDataColumnReader getConvertorFromBinary(boolean isDict,
+ PrimitiveType parquetType,
+ TypeInfo hiveType,
+ ValuesReader valuesReader,
+ Dictionary dictionary) {
+ OriginalType originalType = parquetType.getOriginalType();
+
+ // max length for varchar and char cases
+ int length = getVarcharLength(hiveType);
+
+ if (originalType == null) {
+ return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new
+ DefaultParquetDataColumnReader(valuesReader, length);
+ }
+ switch (originalType) {
+ case DECIMAL:
+ final short scale = (short) parquetType.asPrimitiveType().getDecimalMetadata().getScale();
+ return isDict ? new TypesFromDecimalPageReader(dictionary, length, scale) : new
+ TypesFromDecimalPageReader(valuesReader, length, scale);
+ case UTF8:
+ return isDict ? new TypesFromStringPageReader(dictionary, length) : new
+ TypesFromStringPageReader(valuesReader, length);
+ default:
+ return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new
+ DefaultParquetDataColumnReader(valuesReader, length);
+ }
+ }
+
+ public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
+ PrimitiveType parquetType,
+ TypeInfo hiveType,
+ Dictionary realReader, boolean skipTimestampConversion)
+ throws IOException {
+ return getDataColumnReaderByTypeHelper(true, parquetType, hiveType, realReader, null,
+ skipTimestampConversion);
+ }
+
+ public static ParquetDataColumnReader getDataColumnReaderByType(PrimitiveType parquetType,
+ TypeInfo hiveType,
+ ValuesReader realReader,
+ boolean skipTimestampConversion)
+ throws IOException {
+ return getDataColumnReaderByTypeHelper(false, parquetType, hiveType, null, realReader,
+ skipTimestampConversion);
+ }
+
+
+ // For Varchar or char type, return the max length of the type
+ private static int getVarcharLength(TypeInfo hiveType) {
+ int length = -1;
+ if (hiveType instanceof PrimitiveTypeInfo) {
+ PrimitiveTypeInfo hivePrimitiveType = (PrimitiveTypeInfo) hiveType;
+ switch (hivePrimitiveType.getPrimitiveCategory()) {
+ case CHAR:
+ length = ((CharTypeInfo) hivePrimitiveType).getLength();
+ break;
+ case VARCHAR:
+ length = ((VarcharTypeInfo) hivePrimitiveType).getLength();
+ break;
+ default:
+ break;
+ }
+ }
+
+ return length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
new file mode 100644
index 0000000..ee1d692
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A dummy vectorized parquet reader to support schema evolution.
+ */
+public class VectorizedDummyColumnReader extends BaseVectorizedColumnReader {
+
+ public VectorizedDummyColumnReader() {
+ super();
+ }
+
+ @Override
+ public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException {
+ Arrays.fill(column.isNull, true);
+ column.isRepeating = true;
+ column.noNulls = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
index c36640d..cd2c0ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
@@ -33,6 +33,7 @@ import java.util.List;
/**
* It's column level Parquet reader which is used to read a batch of records for a list column.
+ * TODO Currently List type only support non nested case.
*/
public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
@@ -46,8 +47,9 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
boolean isFirstRow = true;
public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader,
- boolean skipTimestampConversion, Type type) throws IOException {
- super(descriptor, pageReader, skipTimestampConversion, type);
+ boolean skipTimestampConversion, Type type, TypeInfo hiveType)
+ throws IOException {
+ super(descriptor, pageReader, skipTimestampConversion, type, hiveType);
}
@Override
@@ -81,7 +83,7 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
// Decode the value if necessary
if (isCurrentPageDictionaryEncoded) {
- valueList = decodeDictionaryIds(valueList);
+ valueList = decodeDictionaryIds(category, valueList);
}
// Convert valueList to array for the ListColumnVector.child
convertValueListToListColumnVector(category, lcv, valueList, index);
@@ -142,75 +144,112 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
lcv.lengths[index] = elements.size() - lcv.offsets[index];
}
+ // Need to be in consistent with that VectorizedPrimitiveColumnReader#readBatchHelper
+ // TODO Reduce the duplicated code
private Object readPrimitiveTypedRow(PrimitiveObjectInspector.PrimitiveCategory category) {
switch (category) {
- case INT:
- case BYTE:
- case SHORT:
- return dataColumn.readInteger();
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case LONG:
- return dataColumn.readLong();
- case BOOLEAN:
- return dataColumn.readBoolean() ? 1 : 0;
- case DOUBLE:
- return dataColumn.readDouble();
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- return dataColumn.readBytes().getBytesUnsafe();
- case FLOAT:
- return dataColumn.readFloat();
- case DECIMAL:
- return dataColumn.readBytes().getBytesUnsafe();
- case INTERVAL_DAY_TIME:
- case TIMESTAMP:
- default:
- throw new RuntimeException("Unsupported type in the list: " + type);
+ case INT:
+ case BYTE:
+ case SHORT:
+ return dataColumn.readInteger();
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case LONG:
+ return dataColumn.readLong();
+ case BOOLEAN:
+ return dataColumn.readBoolean() ? 1 : 0;
+ case DOUBLE:
+ return dataColumn.readDouble();
+ case BINARY:
+ return dataColumn.readBytes();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return dataColumn.readString();
+ case FLOAT:
+ return dataColumn.readFloat();
+ case DECIMAL:
+ return dataColumn.readDecimal();
+ case TIMESTAMP:
+ return dataColumn.readTimestamp();
+ case INTERVAL_DAY_TIME:
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
}
}
- private List decodeDictionaryIds(List valueList) {
+ private List decodeDictionaryIds(PrimitiveObjectInspector.PrimitiveCategory category, List
+ valueList) {
int total = valueList.size();
List resultList;
List<Integer> intList = (List<Integer>) valueList;
- switch (descriptor.getType()) {
- case INT32:
- resultList = new ArrayList<Integer>(total);
- for (int i = 0; i < total; ++i) {
- resultList.add(dictionary.decodeToInt(intList.get(i)));
- }
- break;
- case INT64:
- resultList = new ArrayList<Long>(total);
- for (int i = 0; i < total; ++i) {
- resultList.add(dictionary.decodeToLong(intList.get(i)));
- }
- break;
- case FLOAT:
- resultList = new ArrayList<Float>(total);
- for (int i = 0; i < total; ++i) {
- resultList.add(dictionary.decodeToFloat(intList.get(i)));
- }
- break;
- case DOUBLE:
- resultList = new ArrayList<Double>(total);
- for (int i = 0; i < total; ++i) {
- resultList.add(dictionary.decodeToDouble(intList.get(i)));
- }
- break;
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- resultList = new ArrayList<byte[]>(total);
- for (int i = 0; i < total; ++i) {
- resultList.add(dictionary.decodeToBinary(intList.get(i)).getBytesUnsafe());
- }
- break;
- default:
- throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+
+ switch (category) {
+ case INT:
+ case BYTE:
+ case SHORT:
+ resultList = new ArrayList<Integer>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readInteger(intList.get(i)));
+ }
+ break;
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case LONG:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readLong(intList.get(i)));
+ }
+ break;
+ case BOOLEAN:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readBoolean(intList.get(i)) ? 1 : 0);
+ }
+ break;
+ case DOUBLE:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readDouble(intList.get(i)));
+ }
+ break;
+ case BINARY:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readBytes(intList.get(i)));
+ }
+ break;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readString(intList.get(i)));
+ }
+ break;
+ case FLOAT:
+ resultList = new ArrayList<Float>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readFloat(intList.get(i)));
+ }
+ break;
+ case DECIMAL:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readDecimal(intList.get(i)));
+ }
+ break;
+ case TIMESTAMP:
+ resultList = new ArrayList<Long>(total);
+ for (int i = 0; i < total; ++i) {
+ resultList.add(dictionary.readTimestamp(intList.get(i)));
+ }
+ break;
+ case INTERVAL_DAY_TIME:
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
}
+
return resultList;
}
@@ -228,71 +267,79 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
lcv.offsets = lcvOffset;
}
- private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory category, ListColumnVector lcv,
- List valueList, int elementNum) {
+ private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory category,
+ ListColumnVector lcv,
+ List valueList, int elementNum) {
int total = valueList.size();
setChildrenInfo(lcv, total, elementNum);
switch (category) {
- case INT:
- case BYTE:
- case SHORT:
- case BOOLEAN:
- lcv.child = new LongColumnVector(total);
- for (int i = 0; i < valueList.size(); i++) {
- ((LongColumnVector)lcv.child).vector[i] = ((List<Integer>)valueList).get(i);
- }
- break;
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case LONG:
- lcv.child = new LongColumnVector(total);
- for (int i = 0; i < valueList.size(); i++) {
- ((LongColumnVector)lcv.child).vector[i] = ((List<Long>)valueList).get(i);
- }
- break;
- case DOUBLE:
- lcv.child = new DoubleColumnVector(total);
- for (int i = 0; i < valueList.size(); i++) {
- ((DoubleColumnVector)lcv.child).vector[i] = ((List<Double>)valueList).get(i);
- }
- break;
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- lcv.child = new BytesColumnVector(total);
- lcv.child.init();
- for (int i = 0; i < valueList.size(); i++) {
- byte[] src = ((List<byte[]>)valueList).get(i);
- ((BytesColumnVector)lcv.child).setRef(i, src, 0, src.length);
- }
- break;
- case FLOAT:
- lcv.child = new DoubleColumnVector(total);
- for (int i = 0; i < valueList.size(); i++) {
- ((DoubleColumnVector)lcv.child).vector[i] = ((List<Float>)valueList).get(i);
- }
- break;
- case DECIMAL:
- int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision();
- int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
- lcv.child = new DecimalColumnVector(total, precision, scale);
- for (int i = 0; i < valueList.size(); i++) {
- ((DecimalColumnVector)lcv.child).vector[i].set(((List<byte[]>)valueList).get(i), scale);
- }
- break;
- case INTERVAL_DAY_TIME:
- case TIMESTAMP:
- default:
- throw new RuntimeException("Unsupported type in the list: " + type);
+ case INT:
+ case BYTE:
+ case SHORT:
+ lcv.child = new LongColumnVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ ((LongColumnVector) lcv.child).vector[i] = ((List<Integer>) valueList).get(i);
+ }
+ break;
+ case BOOLEAN:
+ lcv.child = new LongColumnVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ ((LongColumnVector) lcv.child).vector[i] = ((List<Integer>) valueList).get(i);
+ }
+ break;
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case LONG:
+ lcv.child = new LongColumnVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ ((LongColumnVector) lcv.child).vector[i] = ((List<Long>) valueList).get(i);
+ }
+ break;
+ case DOUBLE:
+ lcv.child = new DoubleColumnVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ ((DoubleColumnVector) lcv.child).vector[i] = ((List<Double>) valueList).get(i);
+ }
+ break;
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ lcv.child = new BytesColumnVector(total);
+ lcv.child.init();
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ ((BytesColumnVector) lcv.child).setRef(i, src, 0, src.length);
+ }
+ break;
+ case FLOAT:
+ lcv.child = new DoubleColumnVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ ((DoubleColumnVector) lcv.child).vector[i] = ((List<Float>) valueList).get(i);
+ }
+ break;
+ case DECIMAL:
+ decimalTypeCheck(type);
+ int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision();
+ int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
+ lcv.child = new DecimalColumnVector(total, precision, scale);
+ for (int i = 0; i < valueList.size(); i++) {
+ ((DecimalColumnVector) lcv.child).vector[i].set(((List<byte[]>) valueList).get(i), scale);
+ }
+ break;
+ case INTERVAL_DAY_TIME:
+ case TIMESTAMP:
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
}
}
/**
* Finish the result ListColumnVector with all collected information.
*/
- private void convertValueListToListColumnVector(PrimitiveObjectInspector.PrimitiveCategory category,
- ListColumnVector lcv, List valueList, int elementNum) {
+ private void convertValueListToListColumnVector(
+ PrimitiveObjectInspector.PrimitiveCategory category, ListColumnVector lcv, List valueList,
+ int elementNum) {
// Fill the child of ListColumnVector with valueList
fillColumnVector(category, lcv, valueList, elementNum);
setIsRepeating(lcv);
@@ -330,9 +377,10 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
System.arraycopy(((LongColumnVector) lcv.child).vector, start,
((LongColumnVector) resultCV).vector, 0, length);
} catch (Exception e) {
- throw new RuntimeException("colinmjj:index:" + index + ", start:" + start + ",length:" + length
- + ",vec len:" + ((LongColumnVector) lcv.child).vector.length + ", offset len:" + lcv.offsets.length
- + ", len len:" + lcv.lengths.length, e);
+ throw new RuntimeException(
+ "Fail to copy at index:" + index + ", start:" + start + ",length:" + length + ",vec " +
+ "len:" + ((LongColumnVector) lcv.child).vector.length + ", offset len:" + lcv
+ .offsets.length + ", len len:" + lcv.lengths.length, e);
}
}
if (child instanceof DoubleColumnVector) {
@@ -371,8 +419,9 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
if (cv1 instanceof DecimalColumnVector && cv2 instanceof DecimalColumnVector) {
return compareDecimalColumnVector((DecimalColumnVector) cv1, (DecimalColumnVector) cv2);
}
- throw new RuntimeException("Unsupported ColumnVector comparision between " + cv1.getClass().getName()
- + " and " + cv2.getClass().getName());
+ throw new RuntimeException(
+ "Unsupported ColumnVector comparision between " + cv1.getClass().getName()
+ + " and " + cv2.getClass().getName());
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 08ac57b..7b77eee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -1,9 +1,13 @@
/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -11,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hive.ql.io.parquet.vector;
-import com.google.common.annotations.VisibleForTesting;
+package org.apache.hadoop.hive.ql.io.parquet.vector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -64,6 +67,7 @@ import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -459,6 +463,19 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
return res;
}
+ // TODO support only non nested case
+ private PrimitiveType getElementType(Type type) {
+ if (type.isPrimitive()) {
+ return type.asPrimitiveType();
+ }
+ if (type.asGroupType().getFields().size() > 1) {
+ throw new RuntimeException(
+ "Current Parquet Vectorization reader doesn't support nested type");
+ }
+ return type.asGroupType().getFields().get(0).asGroupType().getFields().get(0)
+ .asPrimitiveType();
+ }
+
// Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema
private VectorizedColumnReader buildVectorizedParquetReader(
TypeInfo typeInfo,
@@ -474,9 +491,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
if (columnDescriptors == null || columnDescriptors.isEmpty()) {
throw new RuntimeException(
"Failed to find related Parquet column descriptor with type " + type);
- } else {
+ }
+ if (fileSchema.getColumns().contains(descriptors.get(0))) {
return new VectorizedPrimitiveColumnReader(descriptors.get(0),
- pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
+ pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type, typeInfo);
+ } else {
+ // Support for schema evolution
+ return new VectorizedDummyColumnReader();
}
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
@@ -502,8 +523,10 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
throw new RuntimeException(
"Failed to find related Parquet column descriptor with type " + type);
}
+
return new VectorizedListColumnReader(descriptors.get(0),
- pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
+ pages.getPageReader(descriptors.get(0)), skipTimestampConversion, getElementType(type),
+ typeInfo);
case MAP:
if (columnDescriptors == null || columnDescriptors.isEmpty()) {
throw new RuntimeException(
@@ -535,10 +558,10 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
List<Type> kvTypes = groupType.getFields();
VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader(
descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion,
- kvTypes.get(0));
+ kvTypes.get(0), typeInfo);
VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader(
descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion,
- kvTypes.get(1));
+ kvTypes.get(1), typeInfo);
return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader);
case UNION:
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
index 39689f1..1442d69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -19,17 +19,13 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.schema.Type;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.sql.Timestamp;
/**
* It's column level Parquet reader which is used to read a batch of records for a column,
@@ -38,18 +34,18 @@ import java.sql.Timestamp;
public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader {
public VectorizedPrimitiveColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader,
- boolean skipTimestampConversion,
- Type type) throws IOException {
- super(descriptor, pageReader, skipTimestampConversion, type);
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean skipTimestampConversion,
+ Type type, TypeInfo hiveType) throws IOException {
+ super(descriptor, pageReader, skipTimestampConversion, type, hiveType);
}
@Override
public void readBatch(
- int total,
- ColumnVector column,
- TypeInfo columnType) throws IOException {
+ int total,
+ ColumnVector column,
+ TypeInfo columnType) throws IOException {
int rowId = 0;
while (total > 0) {
// Compute the number of values we want to read in this page.
@@ -64,7 +60,7 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
LongColumnVector dictionaryIds = new LongColumnVector();
// Read and decode dictionary ids.
readDictionaryIDs(num, dictionaryIds, rowId);
- decodeDictionaryIds(rowId, num, column, dictionaryIds);
+ decodeDictionaryIds(rowId, num, column, columnType, dictionaryIds);
} else {
// assign values in vector
readBatchHelper(num, column, columnType, rowId);
@@ -75,10 +71,10 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readBatchHelper(
- int num,
- ColumnVector column,
- TypeInfo columnType,
- int rowId) throws IOException {
+ int num,
+ ColumnVector column,
+ TypeInfo columnType,
+ int rowId) throws IOException {
PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
switch (primitiveColumnType.getPrimitiveCategory()) {
@@ -99,10 +95,16 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
readDoubles(num, (DoubleColumnVector) column, rowId);
break;
case BINARY:
+ readBinaries(num, (BytesColumnVector) column, rowId);
+ break;
case STRING:
- case CHAR:
+ readString(num, (BytesColumnVector) column, rowId);
+ break;
case VARCHAR:
- readBinaries(num, (BytesColumnVector) column, rowId);
+ readVarchar(num, (BytesColumnVector) column, rowId);
+ break;
+ case CHAR:
+ readChar(num, (BytesColumnVector) column, rowId);
break;
case FLOAT:
readFloats(num, (DoubleColumnVector) column, rowId);
@@ -120,9 +122,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readDictionaryIDs(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
@@ -141,9 +143,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readIntegers(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
@@ -162,9 +164,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readDoubles(
- int total,
- DoubleColumnVector c,
- int rowId) throws IOException {
+ int total,
+ DoubleColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
@@ -183,9 +185,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readBooleans(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
@@ -204,9 +206,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readLongs(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
@@ -225,9 +227,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readFloats(
- int total,
- DoubleColumnVector c,
- int rowId) throws IOException {
+ int total,
+ DoubleColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
@@ -246,16 +248,17 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
private void readDecimal(
- int total,
- DecimalColumnVector c,
- int rowId) throws IOException {
+ int total,
+ DecimalColumnVector c,
+ int rowId) throws IOException {
+ decimalTypeCheck(type);
int left = total;
c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
while (left > 0) {
readRepetitionAndDefinitionLevels();
if (definitionLevel >= maxDefLevel) {
- c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
+ c.vector[rowId].set(dataColumn.readDecimal(), c.scale);
c.isNull[rowId] = false;
c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
} else {
@@ -268,15 +271,81 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
}
}
+ private void readString(
+ int total,
+ BytesColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.setVal(rowId, dataColumn.readString());
+ c.isNull[rowId] = false;
+ // TODO figure out a better way to set repeat for Binary type
+ c.isRepeating = false;
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readChar(
+ int total,
+ BytesColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.setVal(rowId, dataColumn.readChar());
+ c.isNull[rowId] = false;
+ // TODO figure out a better way to set repeat for Binary type
+ c.isRepeating = false;
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readVarchar(
+ int total,
+ BytesColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.setVal(rowId, dataColumn.readVarchar());
+ c.isNull[rowId] = false;
+ // TODO figure out a better way to set repeat for Binary type
+ c.isRepeating = false;
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
private void readBinaries(
- int total,
- BytesColumnVector c,
- int rowId) throws IOException {
+ int total,
+ BytesColumnVector c,
+ int rowId) throws IOException {
int left = total;
while (left > 0) {
readRepetitionAndDefinitionLevels();
if (definitionLevel >= maxDefLevel) {
- c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
+ c.setVal(rowId, dataColumn.readBytes());
c.isNull[rowId] = false;
// TODO figure out a better way to set repeat for Binary type
c.isRepeating = false;
@@ -296,11 +365,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
readRepetitionAndDefinitionLevels();
if (definitionLevel >= maxDefLevel) {
switch (descriptor.getType()) {
- //INT64 is not yet supported
+ //INT64 is not yet supported
case INT96:
- NanoTime nt = NanoTime.fromBinary(dataColumn.readBytes());
- Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
- c.set(rowId, ts);
+ c.set(rowId, dataColumn.readTimestamp());
break;
default:
throw new IOException(
@@ -323,73 +390,99 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
* Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
*/
private void decodeDictionaryIds(
- int rowId,
- int num,
- ColumnVector column,
- LongColumnVector dictionaryIds) {
+ int rowId,
+ int num,
+ ColumnVector column,
+ TypeInfo columnType,
+ LongColumnVector dictionaryIds) {
System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
if (column.noNulls) {
column.noNulls = dictionaryIds.noNulls;
}
column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
- switch (descriptor.getType()) {
- case INT32:
+
+ PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+
+ switch (primitiveColumnType.getPrimitiveCategory()) {
+ case INT:
+ case BYTE:
+ case SHORT:
for (int i = rowId; i < rowId + num; ++i) {
((LongColumnVector) column).vector[i] =
- dictionary.decodeToInt((int) dictionaryIds.vector[i]);
+ dictionary.readInteger((int) dictionaryIds.vector[i]);
}
break;
- case INT64:
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case LONG:
for (int i = rowId; i < rowId + num; ++i) {
((LongColumnVector) column).vector[i] =
- dictionary.decodeToLong((int) dictionaryIds.vector[i]);
+ dictionary.readLong((int) dictionaryIds.vector[i]);
}
break;
- case FLOAT:
+ case BOOLEAN:
for (int i = rowId; i < rowId + num; ++i) {
- ((DoubleColumnVector) column).vector[i] =
- dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
+ ((LongColumnVector) column).vector[i] =
+ dictionary.readBoolean((int) dictionaryIds.vector[i]) ? 1 : 0;
}
break;
case DOUBLE:
for (int i = rowId; i < rowId + num; ++i) {
((DoubleColumnVector) column).vector[i] =
- dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
+ dictionary.readDouble((int) dictionaryIds.vector[i]);
}
break;
- case INT96:
+ case BINARY:
for (int i = rowId; i < rowId + num; ++i) {
- ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
- buf.order(ByteOrder.LITTLE_ENDIAN);
- long timeOfDayNanos = buf.getLong();
- int julianDay = buf.getInt();
- NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
- Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
- ((TimestampColumnVector) column).set(i, ts);
+ ((BytesColumnVector) column)
+ .setVal(i, dictionary.readBytes((int) dictionaryIds.vector[i]));
}
break;
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- if (column instanceof BytesColumnVector) {
- for (int i = rowId; i < rowId + num; ++i) {
- ((BytesColumnVector) column)
- .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
- }
- } else {
- DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
- decimalColumnVector.precision =
- (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
- decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
- for (int i = rowId; i < rowId + num; ++i) {
- decimalColumnVector.vector[i]
- .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(),
- decimalColumnVector.scale);
- }
+ case STRING:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((BytesColumnVector) column)
+ .setVal(i, dictionary.readString((int) dictionaryIds.vector[i]));
+ }
+ break;
+ case VARCHAR:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((BytesColumnVector) column)
+ .setVal(i, dictionary.readVarchar((int) dictionaryIds.vector[i]));
}
break;
+ case CHAR:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((BytesColumnVector) column)
+ .setVal(i, dictionary.readChar((int) dictionaryIds.vector[i]));
+ }
+ break;
+ case FLOAT:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((DoubleColumnVector) column).vector[i] =
+ dictionary.readFloat((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case DECIMAL:
+ decimalTypeCheck(type);
+ DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
+ decimalColumnVector.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+ decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+ for (int i = rowId; i < rowId + num; ++i) {
+ decimalColumnVector.vector[i]
+ .set(dictionary.readDecimal((int) dictionaryIds.vector[i]),
+ decimalColumnVector.scale);
+ }
+ break;
+ case TIMESTAMP:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((TimestampColumnVector) column)
+ .set(i, dictionary.readTimestamp((int) dictionaryIds.vector[i]));
+ }
+ break;
+ case INTERVAL_DAY_TIME:
default:
- throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+ throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java
new file mode 100644
index 0000000..b695974
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive Parquet Vectorized Reader related.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index 9e414dc..52e6045 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -35,7 +35,6 @@ import org.junit.Test;
import java.io.IOException;
-import static junit.framework.TestCase.assertFalse;
import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
@@ -55,26 +54,40 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
@Test
public void testIntRead() throws Exception {
intRead(isDictionaryEncoding);
+ longReadInt(isDictionaryEncoding);
+ floatReadInt(isDictionaryEncoding);
+ doubleReadInt(isDictionaryEncoding);
}
@Test
public void testLongRead() throws Exception {
longRead(isDictionaryEncoding);
+ floatReadLong(isDictionaryEncoding);
+ doubleReadLong(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testTimestamp() throws Exception {
+ timestampRead(isDictionaryEncoding);
+ stringReadTimestamp(isDictionaryEncoding);
}
@Test
public void testDoubleRead() throws Exception {
doubleRead(isDictionaryEncoding);
+ stringReadDouble(isDictionaryEncoding);
}
@Test
public void testFloatRead() throws Exception {
floatRead(isDictionaryEncoding);
+ doubleReadFloat(isDictionaryEncoding);
}
@Test
public void testBooleanRead() throws Exception {
booleanRead();
+ stringReadBoolean();
}
@Test
@@ -101,6 +114,7 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
@Test
public void decimalRead() throws Exception {
decimalRead(isDictionaryEncoding);
+ stringReadDecimal(isDictionaryEncoding);
}
private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader {
http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
index 3e5d831..32d27d9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
@@ -41,21 +41,34 @@ public class TestVectorizedDictionaryEncodingColumnReader extends VectorizedColu
@Test
public void testIntRead() throws Exception {
intRead(isDictionaryEncoding);
+ longReadInt(isDictionaryEncoding);
+ floatReadInt(isDictionaryEncoding);
+ doubleReadInt(isDictionaryEncoding);
}
@Test
public void testLongRead() throws Exception {
longRead(isDictionaryEncoding);
+ floatReadLong(isDictionaryEncoding);
+ doubleReadLong(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testTimestamp() throws Exception {
+ timestampRead(isDictionaryEncoding);
+ stringReadTimestamp(isDictionaryEncoding);
}
@Test
public void testDoubleRead() throws Exception {
doubleRead(isDictionaryEncoding);
+ stringReadDouble(isDictionaryEncoding);
}
@Test
public void testFloatRead() throws Exception {
floatRead(isDictionaryEncoding);
+ doubleReadFloat(isDictionaryEncoding);
}
@Test
@@ -81,5 +94,6 @@ public class TestVectorizedDictionaryEncodingColumnReader extends VectorizedColu
@Test
public void decimalRead() throws Exception {
decimalRead(isDictionaryEncoding);
+ stringReadDecimal(isDictionaryEncoding);
}
}