You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/12/30 05:30:27 UTC
[1/2] hive git commit: HIVE-15112: Implement Parquet vectorization
reader for Struct type (Ferdinand Xu, reviewed by Chao Sun)
Repository: hive
Updated Branches:
refs/heads/master 4486f2a94 -> a241e55a4
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
new file mode 100644
index 0000000..833cfdb
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java
@@ -0,0 +1,694 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+
+public class VectorizedColumnReaderTestBase {
+
+ protected final static int nElements = 2500;
+ protected final static int UNIQUE_NUM = 10;
+ protected final static int NULL_FREQUENCY = 13;
+
+ protected final static Configuration conf = new Configuration();
+ protected final static Path file = new Path("target/test/TestParquetVectorReader/testParquetFile");
+
+ protected static final MessageType schema = parseMessageType(
+ "message hive_schema { "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required int96 int96_field; "
+ + "required double double_field; "
+ + "required float float_field; "
+ + "required boolean boolean_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "optional fixed_len_byte_array(1) some_null_field; "
+ + "optional fixed_len_byte_array(1) all_null_field; "
+ + "required binary binary_field; "
+ + "optional binary binary_field_some_null; "
+ + "required binary value (DECIMAL(5,2)); "
+ + "required group struct_field {"
+ + " required int32 a;\n"
+ + " required double b;\n"
+ + "}\n"
+ + "optional group nested_struct_field {"
+ + " optional group nsf {"
+ + " optional int32 c;\n"
+ + " optional int32 d;\n"
+ + " }\n"
+ + " optional double e;\n"
+ + "}\n"
+ + "optional group struct_field_some_null {"
+ + " optional int32 f;\n"
+ + " optional double g;\n"
+ + "}\n"
+ + "optional group map_field (MAP) {\n"
+ + " repeated group map (MAP_KEY_VALUE) {\n"
+ + " required binary key;\n"
+ + " optional binary value;\n"
+ + " }\n"
+ + "}\n"
+ + "optional group array_list (LIST) {\n"
+ + " repeated group bag {\n"
+ + " optional int32 array_element;\n"
+ + " }\n"
+ + "}\n"
+ + "} ");
+
+ protected static void removeFile() throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(file)) {
+ fs.delete(file, true);
+ }
+ }
+
+ protected static ParquetWriter<Group> initWriterFromFile() throws IOException {
+ GroupWriteSupport.setSchema(schema, conf);
+ return new ParquetWriter<>(
+ file,
+ new GroupWriteSupport(),
+ GZIP, 1024 * 1024, 1024, 1024 * 1024,
+ true, false, PARQUET_1_0, conf);
+ }
+
+ protected static int getIntValue(
+ boolean isDictionaryEncoding,
+ int index) {
+ return isDictionaryEncoding ? index % UNIQUE_NUM : index;
+ }
+
+ protected static double getDoubleValue(
+ boolean isDictionaryEncoding,
+ int index) {
+ return isDictionaryEncoding ? index % UNIQUE_NUM : index;
+ }
+
+ protected static long getLongValue(
+ boolean isDictionaryEncoding,
+ int index) {
+ return isDictionaryEncoding ? (long) 2 * index % UNIQUE_NUM : (long) 2 * index;
+ }
+
+ protected static float getFloatValue(
+ boolean isDictionaryEncoding,
+ int index) {
+ return (float) (isDictionaryEncoding ? index % UNIQUE_NUM * 2.0 : index * 2.0);
+ }
+
+ protected static boolean getBooleanValue(
+ float index) {
+ return (index % 2 == 0);
+ }
+
+ protected static String getTimestampStr(int index) {
+ String s = String.valueOf(index);
+ int l = 4 - s.length();
+ for (int i = 0; i < l; i++) {
+ s = "0" + s;
+ }
+ return "99999999" + s;
+ }
+
+ protected static HiveDecimal getDecimal(
+ boolean isDictionaryEncoding,
+ int index) {
+ int decimalVal = index % 100;
+ String decimalStr = (decimalVal < 10) ? "0" + String.valueOf(decimalVal) : String.valueOf
+ (decimalVal);
+ int intVal = (isDictionaryEncoding) ? index % UNIQUE_NUM : index / 100;
+ String d = String.valueOf(intVal) + decimalStr;
+ BigInteger bi = new BigInteger(d);
+ BigDecimal bd = new BigDecimal(bi);
+ return HiveDecimal.create(bd);
+ }
+
+ protected static Binary getTimestamp(
+ boolean isDictionaryEncoding,
+ int index) {
+ String s = isDictionaryEncoding ? getTimestampStr(index % UNIQUE_NUM) : getTimestampStr(index);
+ return Binary.fromReusedByteArray(s.getBytes());
+ }
+
+ protected static String getStr(
+ boolean isDictionaryEncoding,
+ int index) {
+ int binaryLen = isDictionaryEncoding ? index % UNIQUE_NUM : index;
+ String v = "";
+ while (binaryLen > 0) {
+ char t = (char) ('a' + binaryLen % 26);
+ binaryLen /= 26;
+ v = t + v;
+ }
+ return v;
+ }
+
+ protected static Binary getBinaryValue(
+ boolean isDictionaryEncoding,
+ int index) {
+ return Binary.fromString(getStr(isDictionaryEncoding, index));
+ }
+
+ protected static boolean isNull(int index) {
+ return (index % NULL_FREQUENCY == 0);
+ }
+
+ protected VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
+ throws IOException, InterruptedException, HiveException {
+ conf.set(PARQUET_READ_SCHEMA, schemaString);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+
+ Job vectorJob = new Job(conf, "read vector");
+ ParquetInputFormat.setInputPaths(vectorJob, file);
+ ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
+ InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
+ initialVectorizedRowBatchCtx(conf);
+ return new VectorizedParquetRecordReader(split, new JobConf(conf));
+ }
+
+ protected static void writeData(ParquetWriter<Group> writer, boolean isDictionaryEncoding) throws IOException {
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ for (int i = 0; i < nElements; i++) {
+ boolean isNull = isNull(i);
+ int intVal = getIntValue(isDictionaryEncoding, i);
+ long longVal = getLongValue(isDictionaryEncoding, i);
+ Binary timeStamp = getTimestamp(isDictionaryEncoding, i);
+ HiveDecimal decimalVal = getDecimal(isDictionaryEncoding, i).setScale(2);
+ double doubleVal = getDoubleValue(isDictionaryEncoding, i);
+ float floatVal = getFloatValue(isDictionaryEncoding, i);
+ boolean booleanVal = getBooleanValue(i);
+ Binary binary = getBinaryValue(isDictionaryEncoding, i);
+ Group group = f.newGroup()
+ .append("int32_field", intVal)
+ .append("int64_field", longVal)
+ .append("int96_field", timeStamp)
+ .append("double_field", doubleVal)
+ .append("float_field", floatVal)
+ .append("boolean_field", booleanVal)
+ .append("flba_field", "abc");
+
+ if (!isNull) {
+ group.append("some_null_field", "x");
+ }
+
+ group.append("binary_field", binary);
+
+ if (!isNull) {
+ group.append("binary_field_some_null", binary);
+ }
+
+ HiveDecimalWritable w = new HiveDecimalWritable(decimalVal);
+ group.append("value", Binary.fromConstantByteArray(w.getInternalStorage()));
+
+ group.addGroup("struct_field")
+ .append("a", intVal)
+ .append("b", doubleVal);
+
+ Group g = group.addGroup("nested_struct_field");
+
+ g.addGroup("nsf").append("c", intVal).append("d", intVal);
+ g.append("e", doubleVal);
+
+ Group some_null_g = group.addGroup("struct_field_some_null");
+ if (i % 2 != 0) {
+ some_null_g.append("f", intVal);
+ }
+ if (i % 3 != 0) {
+ some_null_g.append("g", doubleVal);
+ }
+
+ Group mapGroup = group.addGroup("map_field");
+ if (i % 13 != 1) {
+ mapGroup.addGroup("map").append("key", binary).append("value", "abc");
+ } else {
+ mapGroup.addGroup("map").append("key", binary);
+ }
+
+ Group arrayGroup = group.addGroup("array_list");
+ for (int j = 0; j < i % 4; j++) {
+ arrayGroup.addGroup("bag").append("array_element", intVal);
+ }
+
+ writer.write(group);
+ }
+ writer.close();
+ }
+
+ private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
+ MapWork mapWork = new MapWork();
+ VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.init(createStructObjectInspector(conf), new String[0]);
+ mapWork.setVectorMode(true);
+ mapWork.setVectorizedRowBatchCtx(rbCtx);
+ Utilities.setMapWork(conf, mapWork);
+ }
+
+ private StructObjectInspector createStructObjectInspector(Configuration conf) {
+ // Create row related objects
+ String columnNames = conf.get(IOConstants.COLUMNS);
+ List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+ String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
+ List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+ TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
+ return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
+ }
+
+ protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, HiveException, IOException {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"int32_field");
+ conf.set(IOConstants.COLUMNS_TYPES,"int");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required int32 int32_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals("Failed at " + c, getIntValue(isDictionaryEncoding, c), vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void longRead(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "int64_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "bigint");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required int64 int64_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals("Failed at " + c, getLongValue(isDictionaryEncoding, c), vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void doubleRead(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "double_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "double");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required double double_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals("Failed at " + c, getDoubleValue(isDictionaryEncoding, c), vector.vector[i],
+ 0);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void floatRead(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "float_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "float");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required float float_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals("Failed at " + c, getFloatValue(isDictionaryEncoding, c), vector.vector[i],
+ 0);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void booleanRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "boolean_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "boolean");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required boolean boolean_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals("Failed at " + c, (getBooleanValue(c) ? 1 : 0), vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void binaryRead(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "binary_field_some_null");
+ conf.set(IOConstants.COLUMNS_TYPES, "string");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required binary binary_field_some_null;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+ boolean noNull = true;
+ for (int i = 0; i < vector.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ String actual;
+ assertEquals("Null assert failed at " + c, isNull(c), vector.isNull[i]);
+ if (!vector.isNull[i]) {
+ actual = new String(ArrayUtils
+ .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+ assertEquals("failed at " + c, getStr(isDictionaryEncoding, c), actual);
+ } else {
+ noNull = false;
+ }
+ c++;
+ }
+ assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+ assertFalse(vector.isRepeating);
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void structRead(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "struct_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "struct<a:int,b:double>");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ String schema = "message hive_schema {\n"
+ + "group struct_field {\n"
+ + " optional int32 a;\n"
+ + " optional double b;\n"
+ + "}\n"
+ + "}\n";
+ VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ StructColumnVector vector = (StructColumnVector) previous.cols[0];
+ LongColumnVector cv = (LongColumnVector) vector.fields[0];
+ DoubleColumnVector dv = (DoubleColumnVector) vector.fields[1];
+
+ for (int i = 0; i < cv.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]);
+ assertEquals(getDoubleValue(isDictionaryEncoding, c), dv.vector[i], 0);
+ assertFalse(vector.isNull[i]);
+ assertFalse(vector.isRepeating);
+ c++;
+ }
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void nestedStructRead0(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "nested_struct_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "struct<nsf:struct<c:int,d:int>,e:double>");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ String schema = "message hive_schema {\n"
+ + "group nested_struct_field {\n"
+ + " optional group nsf {\n"
+ + " optional int32 c;\n"
+ + " optional int32 d;\n"
+ + " }"
+ + "optional double e;\n"
+ + "}\n";
+ VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ StructColumnVector vector = (StructColumnVector) previous.cols[0];
+ StructColumnVector sv = (StructColumnVector) vector.fields[0];
+ LongColumnVector cv = (LongColumnVector) sv.fields[0];
+ LongColumnVector dv = (LongColumnVector) sv.fields[1];
+ DoubleColumnVector ev = (DoubleColumnVector) vector.fields[1];
+
+ for (int i = 0; i < cv.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]);
+ assertEquals(getIntValue(isDictionaryEncoding, c), dv.vector[i]);
+ assertEquals(getDoubleValue(isDictionaryEncoding, c), ev.vector[i], 0);
+ assertFalse(vector.isNull[i]);
+ assertFalse(vector.isRepeating);
+ c++;
+ }
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void nestedStructRead1(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "nested_struct_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "struct<nsf:struct<c:int>>");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ String schema = "message hive_schema {\n"
+ + "group nested_struct_field {\n"
+ + " optional group nsf {\n"
+ + " optional int32 c;\n"
+ + " }"
+ + "}\n";
+ VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ StructColumnVector vector = (StructColumnVector) previous.cols[0];
+ StructColumnVector sv = (StructColumnVector) vector.fields[0];
+ LongColumnVector cv = (LongColumnVector) sv.fields[0];
+
+ for (int i = 0; i < cv.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]);
+ assertFalse(vector.isNull[i]);
+ assertFalse(vector.isRepeating);
+ c++;
+ }
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void structReadSomeNull(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "struct_field_some_null");
+ conf.set(IOConstants.COLUMNS_TYPES, "struct<f:int,g:double>");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ String schema = "message hive_schema {\n"
+ + "group struct_field_some_null {\n"
+ + " optional int32 f;\n"
+ + " optional double g;\n"
+ + "}\n";
+ VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ StructColumnVector sv = (StructColumnVector) previous.cols[0];
+ LongColumnVector fv = (LongColumnVector) sv.fields[0];
+ DoubleColumnVector gv = (DoubleColumnVector) sv.fields[1];
+
+ for (int i = 0; i < fv.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals(c % 2 == 0, fv.isNull[i]);
+ assertEquals(c % 3 == 0, gv.isNull[i]);
+ assertEquals(c % /* 2*3 = */6 == 0, sv.isNull[i]);
+ if (!sv.isNull[i]) {
+ if (!fv.isNull[i]) {
+ assertEquals(getIntValue(isDictionaryEncoding, c), fv.vector[i]);
+ }
+ if (!gv.isNull[i]) {
+ assertEquals(getDoubleValue(isDictionaryEncoding, c), gv.vector[i], 0);
+ }
+ }
+ assertFalse(fv.isRepeating);
+ c++;
+ }
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ protected void decimalRead(boolean isDictionaryEncoding) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS, "value");
+ conf.set(IOConstants.COLUMNS_TYPES, "decimal(5,2)");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message hive_schema { required value (DECIMAL(5,2));}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ int c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DecimalColumnVector vector = (DecimalColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if (c == nElements) {
+ break;
+ }
+ assertEquals("Check failed at pos " + c, getDecimal(isDictionaryEncoding, c),
+ vector.vector[i].getHiveDecimal());
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+}
[2/2] hive git commit: HIVE-15112: Implement Parquet vectorization
reader for Struct type (Ferdinand Xu, reviewed by Chao Sun)
Posted by xu...@apache.org.
HIVE-15112: Implement Parquet vectorization reader for Struct type (Ferdinand Xu, reviewed by Chao Sun)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a241e55a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a241e55a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a241e55a
Branch: refs/heads/master
Commit: a241e55a4cddddb6b88feb7fbf1daf4d4db0371e
Parents: 4486f2a
Author: Ferdinand Xu <ch...@intel.com>
Authored: Fri Dec 30 05:56:10 2016 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Fri Dec 30 05:56:10 2016 +0800
----------------------------------------------------------------------
.../parquet/vector/VectorizedColumnReader.java | 566 +--------------
.../vector/VectorizedParquetRecordReader.java | 84 ++-
.../vector/VectorizedPrimitiveColumnReader.java | 589 ++++++++++++++++
.../vector/VectorizedStructColumnReader.java | 59 ++
.../io/parquet/TestVectorizedColumnReader.java | 418 ++---------
...ectorizedDictionaryEncodingColumnReader.java | 85 +++
.../parquet/VectorizedColumnReaderTestBase.java | 694 +++++++++++++++++++
7 files changed, 1563 insertions(+), 932 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
index 5a9c7f9..e3be982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
@@ -1,9 +1,13 @@
/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -11,561 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql.io.parquet.vector;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-/**
- * It's column level Parquet reader which is used to read a batch of records for a column,
- * part of the code is referred from Apache Spark and Apache Parquet.
- */
-public class VectorizedColumnReader {
-
- private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class);
-
- private boolean skipTimestampConversion = false;
-
- /**
- * Total number of values read.
- */
- private long valuesRead;
+public interface VectorizedColumnReader {
/**
- * value that indicates the end of the current page. That is,
- * if valuesRead == endOfPageValueCount, we are at the end of the page.
+ * read records with specified size and type into the columnVector
+ *
+ * @param total number of records to read into the column vector
+ * @param column column vector where the reader will read data into
+ * @param columnType the type of column vector
+ * @throws IOException
*/
- private long endOfPageValueCount;
-
- /**
- * The dictionary, if this column has dictionary encoding.
- */
- private final Dictionary dictionary;
-
- /**
- * If true, the current page is dictionary encoded.
- */
- private boolean isCurrentPageDictionaryEncoded;
-
- /**
- * Maximum definition level for this column.
- */
- private final int maxDefLevel;
-
- private int definitionLevel;
- private int repetitionLevel;
-
- /**
- * Repetition/Definition/Value readers.
- */
- private IntIterator repetitionLevelColumn;
- private IntIterator definitionLevelColumn;
- private ValuesReader dataColumn;
-
- /**
- * Total values in the current page.
- */
- private int pageValueCount;
-
- private final PageReader pageReader;
- private final ColumnDescriptor descriptor;
- private final Type type;
-
- public VectorizedColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader,
- boolean skipTimestampConversion,
- Type type) throws IOException {
- this.descriptor = descriptor;
- this.type = type;
- this.pageReader = pageReader;
- this.maxDefLevel = descriptor.getMaxDefinitionLevel();
- this.skipTimestampConversion = skipTimestampConversion;
-
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
- this.isCurrentPageDictionaryEncoded = true;
- } catch (IOException e) {
- throw new IOException("could not decode the dictionary for " + descriptor, e);
- }
- } else {
- this.dictionary = null;
- this.isCurrentPageDictionaryEncoded = false;
- }
- }
-
void readBatch(
int total,
ColumnVector column,
- TypeInfo columnType) throws IOException {
-
- int rowId = 0;
- while (total > 0) {
- // Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- readPage();
- leftInPage = (int) (endOfPageValueCount - valuesRead);
- }
-
- int num = Math.min(total, leftInPage);
- if (isCurrentPageDictionaryEncoded) {
- LongColumnVector dictionaryIds = new LongColumnVector();
- // Read and decode dictionary ids.
- readDictionaryIDs(num, dictionaryIds, rowId);
- decodeDictionaryIds(rowId, num, column, dictionaryIds);
- } else {
- // assign values in vector
- PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
- switch (primitiveColumnType.getPrimitiveCategory()) {
- case INT:
- case BYTE:
- case SHORT:
- readIntegers(num, (LongColumnVector) column, rowId);
- break;
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case LONG:
- readLongs(num, (LongColumnVector) column, rowId);
- break;
- case BOOLEAN:
- readBooleans(num, (LongColumnVector) column, rowId);
- break;
- case DOUBLE:
- readDoubles(num, (DoubleColumnVector) column, rowId);
- break;
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- readBinaries(num, (BytesColumnVector) column, rowId);
- break;
- case FLOAT:
- readFloats(num, (DoubleColumnVector) column, rowId);
- break;
- case DECIMAL:
- readDecimal(num, (DecimalColumnVector) column, rowId);
- break;
- case INTERVAL_DAY_TIME:
- case TIMESTAMP:
- default:
- throw new IOException(
- "Unsupported type category: " + primitiveColumnType.getPrimitiveCategory());
- }
- }
- rowId += num;
- total -= num;
- }
- }
-
- private void readDictionaryIDs(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readValueDictionaryId();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readIntegers(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readInteger();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readDoubles(
- int total,
- DoubleColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readDouble();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readBooleans(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readLongs(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readLong();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readFloats(
- int total,
- DoubleColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readFloat();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readDecimal(
- int total,
- DecimalColumnVector c,
- int rowId) throws IOException {
- int left = total;
- c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
- c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readBinaries(
- int total,
- BytesColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
- c.isNull[rowId] = false;
- // TODO figure out a better way to set repeat for Binary type
- c.isRepeating = false;
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- /**
- * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
- */
- private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
- LongColumnVector dictionaryIds) {
- System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
- if (column.noNulls) {
- column.noNulls = dictionaryIds.noNulls;
- }
- column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
-
- switch (descriptor.getType()) {
- case INT32:
- for (int i = rowId; i < rowId + num; ++i) {
- ((LongColumnVector) column).vector[i] =
- dictionary.decodeToInt((int) dictionaryIds.vector[i]);
- }
- break;
- case INT64:
- for (int i = rowId; i < rowId + num; ++i) {
- ((LongColumnVector) column).vector[i] =
- dictionary.decodeToLong((int) dictionaryIds.vector[i]);
- }
- break;
- case FLOAT:
- for (int i = rowId; i < rowId + num; ++i) {
- ((DoubleColumnVector) column).vector[i] =
- dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
- }
- break;
- case DOUBLE:
- for (int i = rowId; i < rowId + num; ++i) {
- ((DoubleColumnVector) column).vector[i] =
- dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
- }
- break;
- case INT96:
- for (int i = rowId; i < rowId + num; ++i) {
- ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
- buf.order(ByteOrder.LITTLE_ENDIAN);
- long timeOfDayNanos = buf.getLong();
- int julianDay = buf.getInt();
- NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
- Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
- ((TimestampColumnVector) column).set(i, ts);
- }
- break;
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- for (int i = rowId; i < rowId + num; ++i) {
- ((BytesColumnVector) column)
- .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
- }
- break;
- default:
- throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
- }
- }
-
- private void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.nextInt();
- definitionLevel = definitionLevelColumn.nextInt();
- valuesRead++;
- }
-
- private void readPage() throws IOException {
- DataPage page = pageReader.readPage();
- // TODO: Why is this a visitor?
- page.accept(new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- readPageV1(dataPageV1);
- return null;
- }
-
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- readPageV2(dataPageV2);
- return null;
- }
- });
- }
-
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
- this.pageValueCount = valueCount;
- this.endOfPageValueCount = valuesRead + pageValueCount;
- if (dataEncoding.usesDictionary()) {
- this.dataColumn = null;
- if (dictionary == null) {
- throw new IOException(
- "could not read page in col " + descriptor +
- " as the dictionary was missing for encoding " + dataEncoding);
- }
- dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
- this.isCurrentPageDictionaryEncoded = true;
- } else {
- if (dataEncoding != Encoding.PLAIN) {
- throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
- }
- dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
- this.isCurrentPageDictionaryEncoded = false;
- }
-
- try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
- } catch (IOException e) {
- throw new IOException("could not read page in col " + descriptor, e);
- }
- }
-
- private void readPageV1(DataPageV1 page) {
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
- ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
- this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
- this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
- try {
- byte[] bytes = page.getBytes().toByteArray();
- LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
- LOG.debug("reading repetition levels at 0");
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- LOG.debug("reading definition levels at " + next);
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- LOG.debug("reading data at " + next);
- initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private void readPageV2(DataPageV2 page) {
- this.pageValueCount = page.getValueCount();
- this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
- page.getRepetitionLevels());
- this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
- try {
- LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
- try {
- if (maxLevel == 0) {
- return new NullIntIterator();
- }
- return new RLEIntIterator(
- new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel),
- new ByteArrayInputStream(bytes.toByteArray())));
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
- }
- }
-
- /**
- * Utility classes to abstract over different way to read ints with different encodings.
- * TODO: remove this layer of abstraction?
- */
- abstract static class IntIterator {
- abstract int nextInt();
- }
-
- protected static final class ValuesReaderIntIterator extends IntIterator {
- ValuesReader delegate;
-
- public ValuesReaderIntIterator(ValuesReader delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- return delegate.readInteger();
- }
- }
-
- protected static final class RLEIntIterator extends IntIterator {
- RunLengthBitPackingHybridDecoder delegate;
-
- public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
- }
- }
-
- protected static final class NullIntIterator extends IntIterator {
- @Override
- int nextInt() { return 0; }
- }
+ TypeInfo columnType) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index f94c49a..699de59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -35,6 +37,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
@@ -68,6 +71,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private List<String> columnNamesList;
private List<TypeInfo> columnTypesList;
private VectorizedRowBatchCtx rbCtx;
+ private List<Integer> indexColumnsWanted;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -198,7 +202,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
columnTypesList);
}
- List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+ indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
requestedSchema =
DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
@@ -279,11 +283,81 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
List<ColumnDescriptor> columns = requestedSchema.getColumns();
List<Type> types = requestedSchema.getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
- for (int i = 0; i < columns.size(); ++i) {
- columnReaders[i] =
- new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)),
- skipTimestampConversion, types.get(i));
+
+ if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) {
+ for (int i = 0; i < types.size(); ++i) {
+ columnReaders[i] =
+ buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i),
+ pages, requestedSchema.getColumns(), skipTimestampConversion, 0);
+ }
+ } else {
+ for (int i = 0; i < types.size(); ++i) {
+ columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages,
+ requestedSchema.getColumns(), skipTimestampConversion, 0);
+ }
}
+
totalCountLoadedSoFar += pages.getRowCount();
}
+
+ private List<ColumnDescriptor> getAllColumnDescriptorByType(
+ int depth,
+ Type type,
+ List<ColumnDescriptor> columns) throws ParquetRuntimeException {
+ List<ColumnDescriptor> res = new ArrayList<>();
+ for (ColumnDescriptor descriptor : columns) {
+ if (depth >= descriptor.getPath().length) {
+ throw new InvalidSchemaException("Corrupted Parquet schema");
+ }
+ if (type.getName().equals(descriptor.getPath()[depth])) {
+ res.add(descriptor);
+ }
+ }
+ return res;
+ }
+
+ // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema
+ private VectorizedColumnReader buildVectorizedParquetReader(
+ TypeInfo typeInfo,
+ Type type,
+ PageReadStore pages,
+ List<ColumnDescriptor> columnDescriptors,
+ boolean skipTimestampConversion,
+ int depth) throws IOException {
+ List<ColumnDescriptor> descriptors =
+ getAllColumnDescriptorByType(depth, type, columnDescriptors);
+ switch (typeInfo.getCategory()) {
+ case PRIMITIVE:
+ if (columnDescriptors == null || columnDescriptors.isEmpty()) {
+ throw new RuntimeException(
+ "Failed to find related Parquet column descriptor with type " + type);
+ } else {
+ return new VectorizedPrimitiveColumnReader(descriptors.get(0),
+ pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
+ }
+ case STRUCT:
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ List<VectorizedColumnReader> fieldReaders = new ArrayList<>();
+ List<TypeInfo> fieldTypes = structTypeInfo.getAllStructFieldTypeInfos();
+ List<Type> types = type.asGroupType().getFields();
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ VectorizedColumnReader r =
+ buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors,
+ skipTimestampConversion, depth + 1);
+ if (r != null) {
+ fieldReaders.add(r);
+ } else {
+ throw new RuntimeException(
+ "Fail to build Parquet vectorized reader based on Hive type " + fieldTypes.get(i)
+ .getTypeName() + " and Parquet type" + types.get(i).toString());
+ }
+ }
+ return new VectorizedStructColumnReader(fieldReaders);
+ case LIST:
+ case MAP:
+ case UNION:
+ default:
+ throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
new file mode 100644
index 0000000..3d5c6e6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -0,0 +1,589 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records for a column,
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ */
+public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class);
+
+ private boolean skipTimestampConversion = false;
+
+ /**
+ * Total number of values read.
+ */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is,
+ * if valuesRead == endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ private final Dictionary dictionary;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ private final int maxDefLevel;
+
+ private int definitionLevel;
+ private int repetitionLevel;
+
+ /**
+ * Repetition/Definition/Value readers.
+ */
+ private IntIterator repetitionLevelColumn;
+ private IntIterator definitionLevelColumn;
+ private ValuesReader dataColumn;
+
+ /**
+ * Total values in the current page.
+ */
+ private int pageValueCount;
+
+ private final PageReader pageReader;
+ private final ColumnDescriptor descriptor;
+ private final Type type;
+
+ public VectorizedPrimitiveColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean skipTimestampConversion,
+ Type type) throws IOException {
+ this.descriptor = descriptor;
+ this.type = type;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+ this.skipTimestampConversion = skipTimestampConversion;
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " + descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ }
+
+ public void readBatch(
+ int total,
+ ColumnVector column,
+ TypeInfo columnType) throws IOException {
+ int rowId = 0;
+ while (total > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ readPage();
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+
+ int num = Math.min(total, leftInPage);
+ if (isCurrentPageDictionaryEncoded) {
+ LongColumnVector dictionaryIds = new LongColumnVector();
+ // Read and decode dictionary ids.
+ readDictionaryIDs(num, dictionaryIds, rowId);
+ decodeDictionaryIds(rowId, num, column, dictionaryIds);
+ } else {
+ // assign values in vector
+ readBatchHelper(num, column, columnType, rowId);
+ }
+ rowId += num;
+ total -= num;
+ }
+ }
+
+ private void readBatchHelper(
+ int num,
+ ColumnVector column,
+ TypeInfo columnType,
+ int rowId) throws IOException {
+ PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+ switch (primitiveColumnType.getPrimitiveCategory()) {
+ case INT:
+ case BYTE:
+ case SHORT:
+ readIntegers(num, (LongColumnVector) column, rowId);
+ break;
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case LONG:
+ readLongs(num, (LongColumnVector) column, rowId);
+ break;
+ case BOOLEAN:
+ readBooleans(num, (LongColumnVector) column, rowId);
+ break;
+ case DOUBLE:
+ readDoubles(num, (DoubleColumnVector) column, rowId);
+ break;
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ readBinaries(num, (BytesColumnVector) column, rowId);
+ break;
+ case FLOAT:
+ readFloats(num, (DoubleColumnVector) column, rowId);
+ break;
+ case DECIMAL:
+ readDecimal(num, (DecimalColumnVector) column, rowId);
+ break;
+ case INTERVAL_DAY_TIME:
+ case TIMESTAMP:
+ default:
+ throw new IOException("Unsupported type: " + type);
+ }
+ }
+
+ private void readDictionaryIDs(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readValueDictionaryId();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readIntegers(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readInteger();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readDoubles(
+ int total,
+ DoubleColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readDouble();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readBooleans(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readLongs(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readLong();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readFloats(
+ int total,
+ DoubleColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readFloat();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readDecimal(
+ int total,
+ DecimalColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+ c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readBinaries(
+ int total,
+ BytesColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
+ c.isNull[rowId] = false;
+ // TODO figure out a better way to set repeat for Binary type
+ c.isRepeating = false;
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ /**
+ * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
+ */
+ private void decodeDictionaryIds(
+ int rowId,
+ int num,
+ ColumnVector column,
+ LongColumnVector dictionaryIds) {
+ System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
+ if (column.noNulls) {
+ column.noNulls = dictionaryIds.noNulls;
+ }
+ column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
+
+ switch (descriptor.getType()) {
+ case INT32:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((LongColumnVector) column).vector[i] =
+ dictionary.decodeToInt((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case INT64:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((LongColumnVector) column).vector[i] =
+ dictionary.decodeToLong((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case FLOAT:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((DoubleColumnVector) column).vector[i] =
+ dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case DOUBLE:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((DoubleColumnVector) column).vector[i] =
+ dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case INT96:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ long timeOfDayNanos = buf.getLong();
+ int julianDay = buf.getInt();
+ NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
+ Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
+ ((TimestampColumnVector) column).set(i, ts);
+ }
+ break;
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ if (column instanceof BytesColumnVector) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((BytesColumnVector) column)
+ .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
+ }
+ } else {
+ DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
+ decimalColumnVector.precision =
+ (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+ decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+ for (int i = rowId; i < rowId + num; ++i) {
+ decimalColumnVector.vector[i]
+ .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(),
+ decimalColumnVector.scale);
+ }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+ }
+ }
+
+ private void readRepetitionAndDefinitionLevels() {
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
+ valuesRead++;
+ }
+
+ private void readPage() throws IOException {
+ DataPage page = pageReader.readPage();
+ // TODO: Why is this a visitor?
+ page.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
+ if (dictionary == null) {
+ throw new IOException(
+ "could not read page in col " + descriptor +
+ " as the dictionary was missing for encoding " + dataEncoding);
+ }
+ dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ try {
+ dataColumn.initFromPage(pageValueCount, bytes, offset);
+ } catch (IOException e) {
+ throw new IOException("could not read page in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+ ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ byte[] bytes = page.getBytes().toByteArray();
+ LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+ LOG.debug("reading repetition levels at 0");
+ rlReader.initFromPage(pageValueCount, bytes, 0);
+ int next = rlReader.getNextOffset();
+ LOG.debug("reading definition levels at " + next);
+ dlReader.initFromPage(pageValueCount, bytes, next);
+ next = dlReader.getNextOffset();
+ LOG.debug("reading data at " + next);
+ initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) {
+ this.pageValueCount = page.getValueCount();
+ this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
+ page.getRepetitionLevels());
+ this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
+ try {
+ LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+ initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
+ }
+ }
+
+ /**
+ * Utility classes to abstract over different way to read ints with different encodings.
+ * TODO: remove this layer of abstraction?
+ */
+ abstract static class IntIterator {
+ abstract int nextInt();
+ }
+
+ protected static final class ValuesReaderIntIterator extends IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ protected static final class RLEIntIterator extends IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ protected static final class NullIntIterator extends IntIterator {
+ @Override
+ int nextInt() { return 0; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
new file mode 100644
index 0000000..cc6cb20
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.List;
+
+public class VectorizedStructColumnReader implements VectorizedColumnReader {
+
+ private final List<VectorizedColumnReader> fieldReaders;
+
+ public VectorizedStructColumnReader(List<VectorizedColumnReader> fieldReaders) {
+ this.fieldReaders = fieldReaders;
+ }
+
+ @Override
+ public void readBatch(
+ int total,
+ ColumnVector column,
+ TypeInfo columnType) throws IOException {
+ StructColumnVector structColumnVector = (StructColumnVector) column;
+ StructTypeInfo structTypeInfo = (StructTypeInfo) columnType;
+ ColumnVector[] vectors = structColumnVector.fields;
+ for (int i = 0; i < vectors.length; i++) {
+ fieldReaders.get(i)
+ .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i));
+ structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating;
+
+ for (int j = 0; j < vectors[i].isNull.length; j++) {
+ structColumnVector.isNull[j] =
+ (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j];
+ }
+ structColumnVector.noNulls =
+ (i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index 276ff19..33567eb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -1,9 +1,13 @@
/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,416 +18,74 @@
package org.apache.hadoop.hive.ql.io.parquet;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
-import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertFalse;
-import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
-import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
-import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
-import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
-import static org.junit.Assert.assertEquals;
-
-public class TestVectorizedColumnReader {
-
- private static final int nElements = 2500;
- protected static final Configuration conf = new Configuration();
- protected static final Path file =
- new Path("target/test/TestParquetVectorReader/testParquetFile");
- private static String[] uniqueStrs = new String[nElements];
- private static boolean[] isNulls = new boolean[nElements];
- private static Random random = new Random();
- protected static final MessageType schema = parseMessageType(
- "message test { "
- + "required int32 int32_field; "
- + "required int64 int64_field; "
- + "required int96 int96_field; "
- + "required double double_field; "
- + "required float float_field; "
- + "required boolean boolean_field; "
- + "required fixed_len_byte_array(3) flba_field; "
- + "optional fixed_len_byte_array(1) some_null_field; "
- + "optional fixed_len_byte_array(1) all_null_field; "
- + "optional binary binary_field; "
- + "optional binary binary_field_non_repeating; "
- + "} ");
-
- @AfterClass
- public static void cleanup() throws IOException {
- FileSystem fs = file.getFileSystem(conf);
- if (fs.exists(file)) {
- fs.delete(file, true);
- }
- }
+public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
+ static boolean isDictionaryEncoding = false;
@BeforeClass
- public static void prepareFile() throws IOException {
- cleanup();
-
- boolean dictionaryEnabled = true;
- boolean validating = false;
- GroupWriteSupport.setSchema(schema, conf);
- SimpleGroupFactory f = new SimpleGroupFactory(schema);
- ParquetWriter<Group> writer = new ParquetWriter<Group>(
- file,
- new GroupWriteSupport(),
- GZIP, 1024*1024, 1024, 1024*1024,
- dictionaryEnabled, validating, PARQUET_1_0, conf);
- writeData(f, writer);
- }
-
- protected static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
- initialStrings(uniqueStrs);
- for (int i = 0; i < nElements; i++) {
- Group group = f.newGroup()
- .append("int32_field", i)
- .append("int64_field", (long) 2 * i)
- .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes()))
- .append("double_field", i * 1.0)
- .append("float_field", ((float) (i * 2.0)))
- .append("boolean_field", i % 5 == 0)
- .append("flba_field", "abc");
-
- if (i % 2 == 1) {
- group.append("some_null_field", "x");
- }
-
- if (i % 13 != 1) {
- int binaryLen = i % 10;
- group.append("binary_field",
- Binary.fromString(new String(new char[binaryLen]).replace("\0", "x")));
- }
-
- if (uniqueStrs[i] != null) {
- group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i]));
- }
- writer.write(group);
- }
- writer.close();
+ public static void setup() throws IOException {
+ removeFile();
+ writeData(initWriterFromFile(), isDictionaryEncoding);
}
- private static String getRandomStr() {
- int len = random.nextInt(10);
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < len; i++) {
- sb.append((char) ('a' + random.nextInt(25)));
- }
- return sb.toString();
- }
-
- public static void initialStrings(String[] uniqueStrs) {
- for (int i = 0; i < uniqueStrs.length; i++) {
- String str = getRandomStr();
- if (!str.isEmpty()) {
- uniqueStrs[i] = str;
- isNulls[i] = false;
- }else{
- isNulls[i] = true;
- }
- }
+ @AfterClass
+ public static void cleanup() throws IOException {
+ removeFile();
}
- private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
- throws IOException, InterruptedException, HiveException {
- conf.set(PARQUET_READ_SCHEMA, schemaString);
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
- HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
-
- Job vectorJob = new Job(conf, "read vector");
- ParquetInputFormat.setInputPaths(vectorJob, file);
- ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
- InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
- initialVectorizedRowBatchCtx(conf);
- return new VectorizedParquetRecordReader(split, new JobConf(conf));
+ @Test
+ public void testIntRead() throws Exception {
+ intRead(isDictionaryEncoding);
}
- private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
- MapWork mapWork = new MapWork();
- VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(createStructObjectInspector(conf), new String[0]);
- mapWork.setVectorMode(true);
- mapWork.setVectorizedRowBatchCtx(rbCtx);
- Utilities.setMapWork(conf, mapWork);
+ @Test
+ public void testLongRead() throws Exception {
+ longRead(isDictionaryEncoding);
}
- private StructObjectInspector createStructObjectInspector(Configuration conf) {
- // Create row related objects
- String columnNames = conf.get(IOConstants.COLUMNS);
- List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
- String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
- List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
- TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
- return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
+ @Test
+ public void testDoubleRead() throws Exception {
+ doubleRead(isDictionaryEncoding);
}
@Test
- public void testIntRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"int32_field");
- conf.set(IOConstants.COLUMNS_TYPES,"int");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required int32 int32_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- long c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- LongColumnVector vector = (LongColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- assertEquals(c, vector.vector[i]);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
+ public void testFloatRead() throws Exception {
+ floatRead(isDictionaryEncoding);
}
@Test
- public void testLongRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"int64_field");
- conf.set(IOConstants.COLUMNS_TYPES, "bigint");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required int64 int64_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- long c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- LongColumnVector vector = (LongColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- assertEquals(2 * c, vector.vector[i]);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
+ public void testBooleanRead() throws Exception {
+ booleanRead();
}
@Test
- public void testDoubleRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"double_field");
- conf.set(IOConstants.COLUMNS_TYPES, "double");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required double double_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- long c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- assertEquals(1.0 * c, vector.vector[i], 0);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
+ public void testBinaryRead() throws Exception {
+ binaryRead(isDictionaryEncoding);
}
@Test
- public void testFloatRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"float_field");
- conf.set(IOConstants.COLUMNS_TYPES, "float");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required float float_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- long c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- assertEquals((float)2.0 * c, vector.vector[i], 0);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
+ public void testStructRead() throws Exception {
+ structRead(isDictionaryEncoding);
}
@Test
- public void testBooleanRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"boolean_field");
- conf.set(IOConstants.COLUMNS_TYPES, "boolean");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required boolean boolean_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- long c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- LongColumnVector vector = (LongColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- int e = (c % 5 == 0) ? 1 : 0;
- assertEquals(e, vector.vector[i]);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
+ public void testNestedStructRead() throws Exception {
+ nestedStructRead0(isDictionaryEncoding);
+ nestedStructRead1(isDictionaryEncoding);
}
@Test
- public void testBinaryReadDictionaryEncoding() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"binary_field");
- conf.set(IOConstants.COLUMNS_TYPES, "string");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required binary binary_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
- boolean noNull = true;
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- if (c % 13 == 1) {
- assertTrue(vector.isNull[i]);
- } else {
- assertFalse(vector.isNull[i]);
- int binaryLen = c % 10;
- String expected = new String(new char[binaryLen]).replace("\0", "x");
- String actual = new String(ArrayUtils
- .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
- assertEquals("Failed at " + c, expected, actual);
- noNull = false;
- }
- c++;
- }
- assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
- assertFalse(vector.isRepeating);
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
+ public void structReadSomeNull() throws Exception {
+ structReadSomeNull(isDictionaryEncoding);
}
@Test
- public void testBinaryRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"binary_field_non_repeating");
- conf.set(IOConstants.COLUMNS_TYPES, "string");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required binary binary_field_non_repeating;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
- boolean noNull = true;
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- String actual;
- assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]);
- if (!vector.isNull[i]) {
- actual = new String(ArrayUtils
- .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
- assertEquals("failed at " + c, uniqueStrs[c], actual);
- }else{
- noNull = false;
- }
- c++;
- }
- assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
- assertFalse(vector.isRepeating);
- }
- assertEquals("It doesn't exit at expected position", nElements, c);
- } finally {
- reader.close();
- }
+ public void decimalRead() throws Exception {
+ decimalRead(isDictionaryEncoding);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a241e55a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
new file mode 100644
index 0000000..1e60192
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestVectorizedDictionaryEncodingColumnReader extends VectorizedColumnReaderTestBase {
+ static boolean isDictionaryEncoding = true;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ removeFile();
+ writeData(initWriterFromFile(), isDictionaryEncoding);
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ removeFile();
+ }
+
+ @Test
+ public void testIntRead() throws Exception {
+ intRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testLongRead() throws Exception {
+ longRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testDoubleRead() throws Exception {
+ doubleRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testFloatRead() throws Exception {
+ floatRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testBinaryRead() throws Exception {
+ binaryRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testStructRead() throws Exception {
+ structRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void testNestedStructRead() throws Exception {
+ structRead(isDictionaryEncoding);
+ }
+
+ @Test
+ public void structReadSomeNull() throws Exception {
+ structReadSomeNull(isDictionaryEncoding);
+ }
+
+ @Test
+ public void decimalRead() throws Exception {
+ decimalRead(isDictionaryEncoding);
+ }
+}