You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/12/21 08:59:49 UTC
[1/2] hive git commit: Revert "HIVE-15112: Implement Parquet
vectorization reader for Struct type (Ferdinand Xu, via Chao Sun)"
Repository: hive
Updated Branches:
refs/heads/master ab9b21920 -> fe3b370eb
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java
deleted file mode 100644
index eecccce..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.parquet;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
-import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.List;
-
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertFalse;
-import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
-import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
-import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
-import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
-import static org.junit.Assert.assertEquals;
-
-public class TestVectorizedColumnReaderBase {
-
- protected final static int nElements = 2500;
- protected final static int UNIQUE_NUM = 10;
- protected final static int NULL_FREQUENCY = 13;
-
- protected final static Configuration conf = new Configuration();
- protected final static Path file = new Path("target/test/TestParquetVectorReader/testParquetFile");
-
- protected static final MessageType schema = parseMessageType(
- "message hive_schema { "
- + "required int32 int32_field; "
- + "required int64 int64_field; "
- + "required int96 int96_field; "
- + "required double double_field; "
- + "required float float_field; "
- + "required boolean boolean_field; "
- + "required fixed_len_byte_array(3) flba_field; "
- + "optional fixed_len_byte_array(1) some_null_field; "
- + "optional fixed_len_byte_array(1) all_null_field; "
- + "required binary binary_field; "
- + "optional binary binary_field_some_null; "
- + "required binary value (DECIMAL(5,2)); "
- + "required group struct_field {"
- + " required int32 a;\n"
- + " required double b;\n"
- + "}\n"
- + "optional group nested_struct_field {"
- + " optional group nsf {"
- + " optional int32 c;\n"
- + " optional int32 d;\n"
- + " }\n"
- + " optional double e;\n"
- + "}\n"
- + "optional group struct_field_some_null {"
- + " optional int32 f;\n"
- + " optional double g;\n"
- + "}\n"
- + "optional group map_field (MAP) {\n"
- + " repeated group map (MAP_KEY_VALUE) {\n"
- + " required binary key;\n"
- + " optional binary value;\n"
- + " }\n"
- + "}\n"
- + "optional group array_list (LIST) {\n"
- + " repeated group bag {\n"
- + " optional int32 array_element;\n"
- + " }\n"
- + "}\n"
- + "} ");
-
- protected static void removeFile() throws IOException {
- FileSystem fs = file.getFileSystem(conf);
- if (fs.exists(file)) {
- fs.delete(file, true);
- }
- }
-
- protected static ParquetWriter<Group> initWriterFromFile() throws IOException {
- GroupWriteSupport.setSchema(schema, conf);
- return new ParquetWriter<>(
- file,
- new GroupWriteSupport(),
- GZIP, 1024 * 1024, 1024, 1024 * 1024,
- true, false, PARQUET_1_0, conf);
- }
-
- protected static int getIntValue(
- boolean isDictionaryEncoding,
- int index) {
- return isDictionaryEncoding ? index % UNIQUE_NUM : index;
- }
-
- protected static double getDoubleValue(
- boolean isDictionaryEncoding,
- int index) {
- return isDictionaryEncoding ? index % UNIQUE_NUM : index;
- }
-
- protected static long getLongValue(
- boolean isDictionaryEncoding,
- int index) {
- return isDictionaryEncoding ? (long) 2 * index % UNIQUE_NUM : (long) 2 * index;
- }
-
- protected static float getFloatValue(
- boolean isDictionaryEncoding,
- int index) {
- return (float) (isDictionaryEncoding ? index % UNIQUE_NUM * 2.0 : index * 2.0);
- }
-
- protected static boolean getBooleanValue(
- float index) {
- return (index % 2 == 0);
- }
-
- protected static String getTimestampStr(int index) {
- String s = String.valueOf(index);
- int l = 4 - s.length();
- for (int i = 0; i < l; i++) {
- s = "0" + s;
- }
- return "99999999" + s;
- }
-
- protected static HiveDecimal getDecimal(
- boolean isDictionaryEncoding,
- int index) {
- int decimalVal = index % 100;
- String decimalStr = (decimalVal < 10) ? "0" + String.valueOf(decimalVal) : String.valueOf
- (decimalVal);
- int intVal = (isDictionaryEncoding) ? index % UNIQUE_NUM : index / 100;
- String d = String.valueOf(intVal) + decimalStr;
- BigInteger bi = new BigInteger(d);
- BigDecimal bd = new BigDecimal(bi);
- return HiveDecimal.create(bd);
- }
-
- protected static Binary getTimestamp(
- boolean isDictionaryEncoding,
- int index) {
- String s = isDictionaryEncoding ? getTimestampStr(index % UNIQUE_NUM) : getTimestampStr(index);
- return Binary.fromReusedByteArray(s.getBytes());
- }
-
- protected static String getStr(
- boolean isDictionaryEncoding,
- int index) {
- int binaryLen = isDictionaryEncoding ? index % UNIQUE_NUM : index;
- String v = "";
- while (binaryLen > 0) {
- char t = (char) ('a' + binaryLen % 26);
- binaryLen /= 26;
- v = t + v;
- }
- return v;
- }
-
- protected static Binary getBinaryValue(
- boolean isDictionaryEncoding,
- int index) {
- return Binary.fromString(getStr(isDictionaryEncoding, index));
- }
-
- protected static boolean isNull(int index) {
- return (index % NULL_FREQUENCY == 0);
- }
-
- protected VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
- throws IOException, InterruptedException, HiveException {
- conf.set(PARQUET_READ_SCHEMA, schemaString);
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
- HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
-
- Job vectorJob = new Job(conf, "read vector");
- ParquetInputFormat.setInputPaths(vectorJob, file);
- ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
- InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
- initialVectorizedRowBatchCtx(conf);
- return new VectorizedParquetRecordReader(split, new JobConf(conf));
- }
-
- protected static void writeData(ParquetWriter<Group> writer, boolean isDictionaryEncoding) throws IOException {
- SimpleGroupFactory f = new SimpleGroupFactory(schema);
- for (int i = 0; i < nElements; i++) {
- boolean isNull = isNull(i);
- int intVal = getIntValue(isDictionaryEncoding, i);
- long longVal = getLongValue(isDictionaryEncoding, i);
- Binary timeStamp = getTimestamp(isDictionaryEncoding, i);
- HiveDecimal decimalVal = getDecimal(isDictionaryEncoding, i).setScale(2);
- double doubleVal = getDoubleValue(isDictionaryEncoding, i);
- float floatVal = getFloatValue(isDictionaryEncoding, i);
- boolean booleanVal = getBooleanValue(i);
- Binary binary = getBinaryValue(isDictionaryEncoding, i);
- Group group = f.newGroup()
- .append("int32_field", intVal)
- .append("int64_field", longVal)
- .append("int96_field", timeStamp)
- .append("double_field", doubleVal)
- .append("float_field", floatVal)
- .append("boolean_field", booleanVal)
- .append("flba_field", "abc");
-
- if (!isNull) {
- group.append("some_null_field", "x");
- }
-
- group.append("binary_field", binary);
-
- if (!isNull) {
- group.append("binary_field_some_null", binary);
- }
-
- HiveDecimalWritable w = new HiveDecimalWritable(decimalVal);
- group.append("value", Binary.fromConstantByteArray(w.getInternalStorage()));
-
- group.addGroup("struct_field")
- .append("a", intVal)
- .append("b", doubleVal);
-
- Group g = group.addGroup("nested_struct_field");
-
- g.addGroup("nsf").append("c", intVal).append("d", intVal);
- g.append("e", doubleVal);
-
- Group some_null_g = group.addGroup("struct_field_some_null");
- if (i % 2 != 0) {
- some_null_g.append("f", intVal);
- }
- if (i % 3 != 0) {
- some_null_g.append("g", doubleVal);
- }
-
- Group mapGroup = group.addGroup("map_field");
- if (i % 13 != 1) {
- mapGroup.addGroup("map").append("key", binary).append("value", "abc");
- } else {
- mapGroup.addGroup("map").append("key", binary);
- }
-
- Group arrayGroup = group.addGroup("array_list");
- for (int j = 0; j < i % 4; j++) {
- arrayGroup.addGroup("bag").append("array_element", intVal);
- }
-
- writer.write(group);
- }
- writer.close();
- }
-
- private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
- MapWork mapWork = new MapWork();
- VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(createStructObjectInspector(conf), new String[0]);
- mapWork.setVectorMode(true);
- mapWork.setVectorizedRowBatchCtx(rbCtx);
- Utilities.setMapWork(conf, mapWork);
- }
-
- private StructObjectInspector createStructObjectInspector(Configuration conf) {
- // Create row related objects
- String columnNames = conf.get(IOConstants.COLUMNS);
- List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
- String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
- List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
- TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
- return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
- }
-
- protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, HiveException, IOException {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS,"int32_field");
- conf.set(IOConstants.COLUMNS_TYPES,"int");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required int32 int32_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- int c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- LongColumnVector vector = (LongColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if(c == nElements){
- break;
- }
- assertEquals("Failed at " + c, getIntValue(isDictionaryEncoding, c), vector.vector[i]);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void longRead(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "int64_field");
- conf.set(IOConstants.COLUMNS_TYPES, "bigint");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required int64 int64_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- int c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- LongColumnVector vector = (LongColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals("Failed at " + c, getLongValue(isDictionaryEncoding, c), vector.vector[i]);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void doubleRead(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "double_field");
- conf.set(IOConstants.COLUMNS_TYPES, "double");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required double double_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- int c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals("Failed at " + c, getDoubleValue(isDictionaryEncoding, c), vector.vector[i],
- 0);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void floatRead(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "float_field");
- conf.set(IOConstants.COLUMNS_TYPES, "float");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required float float_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- int c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals("Failed at " + c, getFloatValue(isDictionaryEncoding, c), vector.vector[i],
- 0);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void booleanRead() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "boolean_field");
- conf.set(IOConstants.COLUMNS_TYPES, "boolean");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required boolean boolean_field;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- int c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- LongColumnVector vector = (LongColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals("Failed at " + c, (getBooleanValue(c) ? 1 : 0), vector.vector[i]);
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void binaryRead(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "binary_field_some_null");
- conf.set(IOConstants.COLUMNS_TYPES, "string");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message test { required binary binary_field_some_null;}", conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
- boolean noNull = true;
- for (int i = 0; i < vector.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- String actual;
- assertEquals("Null assert failed at " + c, isNull(c), vector.isNull[i]);
- if (!vector.isNull[i]) {
- actual = new String(ArrayUtils
- .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
- assertEquals("failed at " + c, getStr(isDictionaryEncoding, c), actual);
- } else {
- noNull = false;
- }
- c++;
- }
- assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
- assertFalse(vector.isRepeating);
- }
- assertEquals("It doesn't exit at expected position", nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void structRead(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "struct_field");
- conf.set(IOConstants.COLUMNS_TYPES, "struct<a:int,b:double>");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- String schema = "message hive_schema {\n"
- + "group struct_field {\n"
- + " optional int32 a;\n"
- + " optional double b;\n"
- + "}\n"
- + "}\n";
- VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- StructColumnVector vector = (StructColumnVector) previous.cols[0];
- LongColumnVector cv = (LongColumnVector) vector.fields[0];
- DoubleColumnVector dv = (DoubleColumnVector) vector.fields[1];
-
- for (int i = 0; i < cv.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]);
- assertEquals(getDoubleValue(isDictionaryEncoding, c), dv.vector[i], 0);
- assertFalse(vector.isNull[i]);
- assertFalse(vector.isRepeating);
- c++;
- }
- }
- assertEquals("It doesn't exit at expected position", nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void nestedStructRead0(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "nested_struct_field");
- conf.set(IOConstants.COLUMNS_TYPES, "struct<nsf:struct<c:int,d:int>,e:double>");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- String schema = "message hive_schema {\n"
- + "group nested_struct_field {\n"
- + " optional group nsf {\n"
- + " optional int32 c;\n"
- + " optional int32 d;\n"
- + " }"
- + "optional double e;\n"
- + "}\n";
- VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- StructColumnVector vector = (StructColumnVector) previous.cols[0];
- StructColumnVector sv = (StructColumnVector) vector.fields[0];
- LongColumnVector cv = (LongColumnVector) sv.fields[0];
- LongColumnVector dv = (LongColumnVector) sv.fields[1];
- DoubleColumnVector ev = (DoubleColumnVector) vector.fields[1];
-
- for (int i = 0; i < cv.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]);
- assertEquals(getIntValue(isDictionaryEncoding, c), dv.vector[i]);
- assertEquals(getDoubleValue(isDictionaryEncoding, c), ev.vector[i], 0);
- assertFalse(vector.isNull[i]);
- assertFalse(vector.isRepeating);
- c++;
- }
- }
- assertEquals("It doesn't exit at expected position", nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void nestedStructRead1(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "nested_struct_field");
- conf.set(IOConstants.COLUMNS_TYPES, "struct<nsf:struct<c:int>>");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- String schema = "message hive_schema {\n"
- + "group nested_struct_field {\n"
- + " optional group nsf {\n"
- + " optional int32 c;\n"
- + " }"
- + "}\n";
- VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- StructColumnVector vector = (StructColumnVector) previous.cols[0];
- StructColumnVector sv = (StructColumnVector) vector.fields[0];
- LongColumnVector cv = (LongColumnVector) sv.fields[0];
-
- for (int i = 0; i < cv.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]);
- assertFalse(vector.isNull[i]);
- assertFalse(vector.isRepeating);
- c++;
- }
- }
- assertEquals("It doesn't exit at expected position", nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void structReadSomeNull(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "struct_field_some_null");
- conf.set(IOConstants.COLUMNS_TYPES, "struct<f:int,g:double>");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- String schema = "message hive_schema {\n"
- + "group struct_field_some_null {\n"
- + " optional int32 f;\n"
- + " optional double g;\n"
- + "}\n";
- VectorizedParquetRecordReader reader = createParquetReader(schema, conf);
- VectorizedRowBatch previous = reader.createValue();
- int c = 0;
- try {
- while (reader.next(NullWritable.get(), previous)) {
- StructColumnVector sv = (StructColumnVector) previous.cols[0];
- LongColumnVector fv = (LongColumnVector) sv.fields[0];
- DoubleColumnVector gv = (DoubleColumnVector) sv.fields[1];
-
- for (int i = 0; i < fv.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals(c % 2 == 0, fv.isNull[i]);
- assertEquals(c % 3 == 0, gv.isNull[i]);
- assertEquals(c % /* 2*3 = */6 == 0, sv.isNull[i]);
- if (!sv.isNull[i]) {
- if (!fv.isNull[i]) {
- assertEquals(getIntValue(isDictionaryEncoding, c), fv.vector[i]);
- }
- if (!gv.isNull[i]) {
- assertEquals(getDoubleValue(isDictionaryEncoding, c), gv.vector[i], 0);
- }
- }
- assertFalse(fv.isRepeating);
- c++;
- }
- }
- assertEquals("It doesn't exit at expected position", nElements, c);
- } finally {
- reader.close();
- }
- }
-
- protected void decimalRead(boolean isDictionaryEncoding) throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.COLUMNS, "value");
- conf.set(IOConstants.COLUMNS_TYPES, "decimal(5,2)");
- conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
- conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
- VectorizedParquetRecordReader reader =
- createParquetReader("message hive_schema { required value (DECIMAL(5,2));}", conf);
- VectorizedRowBatch previous = reader.createValue();
- try {
- int c = 0;
- while (reader.next(NullWritable.get(), previous)) {
- DecimalColumnVector vector = (DecimalColumnVector) previous.cols[0];
- assertTrue(vector.noNulls);
- for (int i = 0; i < vector.vector.length; i++) {
- if (c == nElements) {
- break;
- }
- assertEquals("Check failed at pos " + c, getDecimal(isDictionaryEncoding, c),
- vector.vector[i].getHiveDecimal());
- assertFalse(vector.isNull[i]);
- c++;
- }
- }
- assertEquals(nElements, c);
- } finally {
- reader.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
deleted file mode 100644
index c6677cc..0000000
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.parquet;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class TestVectorizedDictionaryEncodingColumnReader extends TestVectorizedColumnReaderBase {
- static boolean isDictionaryEncoding = true;
-
- @BeforeClass
- public static void setup() throws IOException {
- removeFile();
- writeData(initWriterFromFile(), isDictionaryEncoding);
- }
-
- @AfterClass
- public static void cleanup() throws IOException {
- removeFile();
- }
-
- @Test
- public void testIntRead() throws Exception {
- intRead(isDictionaryEncoding);
- }
-
- @Test
- public void testLongRead() throws Exception {
- longRead(isDictionaryEncoding);
- }
-
- @Test
- public void testDoubleRead() throws Exception {
- doubleRead(isDictionaryEncoding);
- }
-
- @Test
- public void testFloatRead() throws Exception {
- floatRead(isDictionaryEncoding);
- }
-
- @Test
- public void testBinaryRead() throws Exception {
- binaryRead(isDictionaryEncoding);
- }
-
- @Test
- public void testStructRead() throws Exception {
- structRead(isDictionaryEncoding);
- }
-
- @Test
- public void testNestedStructRead() throws Exception {
- structRead(isDictionaryEncoding);
- }
-
- @Test
- public void structReadSomeNull() throws Exception {
- structReadSomeNull(isDictionaryEncoding);
- }
-
- @Test
- public void decimalRead() throws Exception {
- decimalRead(isDictionaryEncoding);
- }
-}
[2/2] hive git commit: Revert "HIVE-15112: Implement Parquet
vectorization reader for Struct type (Ferdinand Xu, via Chao Sun)"
Posted by xu...@apache.org.
Revert "HIVE-15112: Implement Parquet vectorization reader for Struct type (Ferdinand Xu, via Chao Sun)"
This reverts commit 9a524ada2eafda2f4853a12d469f7ca48e57f38c.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe3b370e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe3b370e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe3b370e
Branch: refs/heads/master
Commit: fe3b370eb44a4864e5087cc80ee413931e89770d
Parents: ab9b219
Author: Ferdinand Xu <ch...@intel.com>
Authored: Wed Dec 21 09:22:49 2016 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Wed Dec 21 09:22:49 2016 +0800
----------------------------------------------------------------------
.../parquet/vector/VectorizedColumnReader.java | 566 ++++++++++++++-
.../vector/VectorizedParquetRecordReader.java | 84 +--
.../vector/VectorizedPrimitiveColumnReader.java | 589 ----------------
.../vector/VectorizedStructColumnReader.java | 59 --
.../io/parquet/TestVectorizedColumnReader.java | 418 +++++++++--
.../parquet/TestVectorizedColumnReaderBase.java | 694 -------------------
...ectorizedDictionaryEncodingColumnReader.java | 85 ---
7 files changed, 932 insertions(+), 1563 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
index e3be982..5a9c7f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
@@ -1,13 +1,9 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,25 +11,561 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.ql.io.parquet.vector;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records for a column,
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ */
+public class VectorizedColumnReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class);
+
+ private boolean skipTimestampConversion = false;
+
+ /**
+ * Total number of values read.
+ */
+ private long valuesRead;
-public interface VectorizedColumnReader {
/**
- * read records with specified size and type into the columnVector
- *
- * @param total number of records to read into the column vector
- * @param column column vector where the reader will read data into
- * @param columnType the type of column vector
- * @throws IOException
+ * value that indicates the end of the current page. That is,
+ * if valuesRead == endOfPageValueCount, we are at the end of the page.
*/
+ private long endOfPageValueCount;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ private final Dictionary dictionary;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ private final int maxDefLevel;
+
+ private int definitionLevel;
+ private int repetitionLevel;
+
+ /**
+ * Repetition/Definition/Value readers.
+ */
+ private IntIterator repetitionLevelColumn;
+ private IntIterator definitionLevelColumn;
+ private ValuesReader dataColumn;
+
+ /**
+ * Total values in the current page.
+ */
+ private int pageValueCount;
+
+ private final PageReader pageReader;
+ private final ColumnDescriptor descriptor;
+ private final Type type;
+
+ public VectorizedColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean skipTimestampConversion,
+ Type type) throws IOException {
+ this.descriptor = descriptor;
+ this.type = type;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+ this.skipTimestampConversion = skipTimestampConversion;
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " + descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ }
+
void readBatch(
int total,
ColumnVector column,
- TypeInfo columnType) throws IOException;
+ TypeInfo columnType) throws IOException {
+
+ int rowId = 0;
+ while (total > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ readPage();
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+
+ int num = Math.min(total, leftInPage);
+ if (isCurrentPageDictionaryEncoded) {
+ LongColumnVector dictionaryIds = new LongColumnVector();
+ // Read and decode dictionary ids.
+ readDictionaryIDs(num, dictionaryIds, rowId);
+ decodeDictionaryIds(rowId, num, column, dictionaryIds);
+ } else {
+ // assign values in vector
+ PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+ switch (primitiveColumnType.getPrimitiveCategory()) {
+ case INT:
+ case BYTE:
+ case SHORT:
+ readIntegers(num, (LongColumnVector) column, rowId);
+ break;
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case LONG:
+ readLongs(num, (LongColumnVector) column, rowId);
+ break;
+ case BOOLEAN:
+ readBooleans(num, (LongColumnVector) column, rowId);
+ break;
+ case DOUBLE:
+ readDoubles(num, (DoubleColumnVector) column, rowId);
+ break;
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ readBinaries(num, (BytesColumnVector) column, rowId);
+ break;
+ case FLOAT:
+ readFloats(num, (DoubleColumnVector) column, rowId);
+ break;
+ case DECIMAL:
+ readDecimal(num, (DecimalColumnVector) column, rowId);
+ break;
+ case INTERVAL_DAY_TIME:
+ case TIMESTAMP:
+ default:
+ throw new IOException(
+ "Unsupported type category: " + primitiveColumnType.getPrimitiveCategory());
+ }
+ }
+ rowId += num;
+ total -= num;
+ }
+ }
+
+ private void readDictionaryIDs(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readValueDictionaryId();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readIntegers(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readInteger();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readDoubles(
+ int total,
+ DoubleColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readDouble();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readBooleans(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readLongs(
+ int total,
+ LongColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readLong();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readFloats(
+ int total,
+ DoubleColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId] = dataColumn.readFloat();
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readDecimal(
+ int total,
+ DecimalColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+ c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
+ c.isNull[rowId] = false;
+ c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ private void readBinaries(
+ int total,
+ BytesColumnVector c,
+ int rowId) throws IOException {
+ int left = total;
+ while (left > 0) {
+ readRepetitionAndDefinitionLevels();
+ if (definitionLevel >= maxDefLevel) {
+ c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
+ c.isNull[rowId] = false;
+ // TODO figure out a better way to set repeat for Binary type
+ c.isRepeating = false;
+ } else {
+ c.isNull[rowId] = true;
+ c.isRepeating = false;
+ c.noNulls = false;
+ }
+ rowId++;
+ left--;
+ }
+ }
+
+ /**
+ * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
+ */
+ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
+ LongColumnVector dictionaryIds) {
+ System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
+ if (column.noNulls) {
+ column.noNulls = dictionaryIds.noNulls;
+ }
+ column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
+
+ switch (descriptor.getType()) {
+ case INT32:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((LongColumnVector) column).vector[i] =
+ dictionary.decodeToInt((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case INT64:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((LongColumnVector) column).vector[i] =
+ dictionary.decodeToLong((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case FLOAT:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((DoubleColumnVector) column).vector[i] =
+ dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case DOUBLE:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((DoubleColumnVector) column).vector[i] =
+ dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
+ }
+ break;
+ case INT96:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+ long timeOfDayNanos = buf.getLong();
+ int julianDay = buf.getInt();
+ NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
+ Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
+ ((TimestampColumnVector) column).set(i, ts);
+ }
+ break;
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ for (int i = rowId; i < rowId + num; ++i) {
+ ((BytesColumnVector) column)
+ .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+ }
+ }
+
+ private void readRepetitionAndDefinitionLevels() {
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
+ valuesRead++;
+ }
+
+ private void readPage() throws IOException {
+ DataPage page = pageReader.readPage();
+ // TODO: Why is this a visitor?
+ page.accept(new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
+ if (dictionary == null) {
+ throw new IOException(
+ "could not read page in col " + descriptor +
+ " as the dictionary was missing for encoding " + dataEncoding);
+ }
+ dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ try {
+ dataColumn.initFromPage(pageValueCount, bytes, offset);
+ } catch (IOException e) {
+ throw new IOException("could not read page in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+ ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ byte[] bytes = page.getBytes().toByteArray();
+ LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+ LOG.debug("reading repetition levels at 0");
+ rlReader.initFromPage(pageValueCount, bytes, 0);
+ int next = rlReader.getNextOffset();
+ LOG.debug("reading definition levels at " + next);
+ dlReader.initFromPage(pageValueCount, bytes, next);
+ next = dlReader.getNextOffset();
+ LOG.debug("reading data at " + next);
+ initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) {
+ this.pageValueCount = page.getValueCount();
+ this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
+ page.getRepetitionLevels());
+ this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
+ try {
+ LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+ initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
+ }
+ }
+
+ /**
+ * Utility classes to abstract over different way to read ints with different encodings.
+ * TODO: remove this layer of abstraction?
+ */
+ abstract static class IntIterator {
+ abstract int nextInt();
+ }
+
+ protected static final class ValuesReaderIntIterator extends IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ protected static final class RLEIntIterator extends IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ protected static final class NullIntIterator extends IntIterator {
+ @Override
+ int nextInt() { return 0; }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 699de59..f94c49a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -23,13 +23,11 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -37,7 +35,6 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
@@ -71,7 +68,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private List<String> columnNamesList;
private List<TypeInfo> columnTypesList;
private VectorizedRowBatchCtx rbCtx;
- private List<Integer> indexColumnsWanted;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -202,7 +198,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
columnTypesList);
}
- indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+ List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
requestedSchema =
DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
@@ -283,81 +279,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
List<ColumnDescriptor> columns = requestedSchema.getColumns();
List<Type> types = requestedSchema.getFields();
columnReaders = new VectorizedColumnReader[columns.size()];
-
- if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) {
- for (int i = 0; i < types.size(); ++i) {
- columnReaders[i] =
- buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i),
- pages, requestedSchema.getColumns(), skipTimestampConversion, 0);
- }
- } else {
- for (int i = 0; i < types.size(); ++i) {
- columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages,
- requestedSchema.getColumns(), skipTimestampConversion, 0);
- }
+ for (int i = 0; i < columns.size(); ++i) {
+ columnReaders[i] =
+ new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)),
+ skipTimestampConversion, types.get(i));
}
-
totalCountLoadedSoFar += pages.getRowCount();
}
-
- private List<ColumnDescriptor> getAllColumnDescriptorByType(
- int depth,
- Type type,
- List<ColumnDescriptor> columns) throws ParquetRuntimeException {
- List<ColumnDescriptor> res = new ArrayList<>();
- for (ColumnDescriptor descriptor : columns) {
- if (depth >= descriptor.getPath().length) {
- throw new InvalidSchemaException("Corrupted Parquet schema");
- }
- if (type.getName().equals(descriptor.getPath()[depth])) {
- res.add(descriptor);
- }
- }
- return res;
- }
-
- // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema
- private VectorizedColumnReader buildVectorizedParquetReader(
- TypeInfo typeInfo,
- Type type,
- PageReadStore pages,
- List<ColumnDescriptor> columnDescriptors,
- boolean skipTimestampConversion,
- int depth) throws IOException {
- List<ColumnDescriptor> descriptors =
- getAllColumnDescriptorByType(depth, type, columnDescriptors);
- switch (typeInfo.getCategory()) {
- case PRIMITIVE:
- if (columnDescriptors == null || columnDescriptors.isEmpty()) {
- throw new RuntimeException(
- "Failed to find related Parquet column descriptor with type " + type);
- } else {
- return new VectorizedPrimitiveColumnReader(descriptors.get(0),
- pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
- }
- case STRUCT:
- StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
- List<VectorizedColumnReader> fieldReaders = new ArrayList<>();
- List<TypeInfo> fieldTypes = structTypeInfo.getAllStructFieldTypeInfos();
- List<Type> types = type.asGroupType().getFields();
- for (int i = 0; i < fieldTypes.size(); i++) {
- VectorizedColumnReader r =
- buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors,
- skipTimestampConversion, depth + 1);
- if (r != null) {
- fieldReaders.add(r);
- } else {
- throw new RuntimeException(
- "Fail to build Parquet vectorized reader based on Hive type " + fieldTypes.get(i)
- .getTypeName() + " and Parquet type" + types.get(i).toString());
- }
- }
- return new VectorizedStructColumnReader(fieldReaders);
- case LIST:
- case MAP:
- case UNION:
- default:
- throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
deleted file mode 100644
index 3d5c6e6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.parquet.vector;
-
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.sql.Timestamp;
-
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-/**
- * It's column level Parquet reader which is used to read a batch of records for a column,
- * part of the code is referred from Apache Spark and Apache Parquet.
- */
-public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader {
-
- private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class);
-
- private boolean skipTimestampConversion = false;
-
- /**
- * Total number of values read.
- */
- private long valuesRead;
-
- /**
- * value that indicates the end of the current page. That is,
- * if valuesRead == endOfPageValueCount, we are at the end of the page.
- */
- private long endOfPageValueCount;
-
- /**
- * The dictionary, if this column has dictionary encoding.
- */
- private final Dictionary dictionary;
-
- /**
- * If true, the current page is dictionary encoded.
- */
- private boolean isCurrentPageDictionaryEncoded;
-
- /**
- * Maximum definition level for this column.
- */
- private final int maxDefLevel;
-
- private int definitionLevel;
- private int repetitionLevel;
-
- /**
- * Repetition/Definition/Value readers.
- */
- private IntIterator repetitionLevelColumn;
- private IntIterator definitionLevelColumn;
- private ValuesReader dataColumn;
-
- /**
- * Total values in the current page.
- */
- private int pageValueCount;
-
- private final PageReader pageReader;
- private final ColumnDescriptor descriptor;
- private final Type type;
-
- public VectorizedPrimitiveColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader,
- boolean skipTimestampConversion,
- Type type) throws IOException {
- this.descriptor = descriptor;
- this.type = type;
- this.pageReader = pageReader;
- this.maxDefLevel = descriptor.getMaxDefinitionLevel();
- this.skipTimestampConversion = skipTimestampConversion;
-
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
- this.isCurrentPageDictionaryEncoded = true;
- } catch (IOException e) {
- throw new IOException("could not decode the dictionary for " + descriptor, e);
- }
- } else {
- this.dictionary = null;
- this.isCurrentPageDictionaryEncoded = false;
- }
- }
-
- public void readBatch(
- int total,
- ColumnVector column,
- TypeInfo columnType) throws IOException {
- int rowId = 0;
- while (total > 0) {
- // Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- readPage();
- leftInPage = (int) (endOfPageValueCount - valuesRead);
- }
-
- int num = Math.min(total, leftInPage);
- if (isCurrentPageDictionaryEncoded) {
- LongColumnVector dictionaryIds = new LongColumnVector();
- // Read and decode dictionary ids.
- readDictionaryIDs(num, dictionaryIds, rowId);
- decodeDictionaryIds(rowId, num, column, dictionaryIds);
- } else {
- // assign values in vector
- readBatchHelper(num, column, columnType, rowId);
- }
- rowId += num;
- total -= num;
- }
- }
-
- private void readBatchHelper(
- int num,
- ColumnVector column,
- TypeInfo columnType,
- int rowId) throws IOException {
- PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
- switch (primitiveColumnType.getPrimitiveCategory()) {
- case INT:
- case BYTE:
- case SHORT:
- readIntegers(num, (LongColumnVector) column, rowId);
- break;
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case LONG:
- readLongs(num, (LongColumnVector) column, rowId);
- break;
- case BOOLEAN:
- readBooleans(num, (LongColumnVector) column, rowId);
- break;
- case DOUBLE:
- readDoubles(num, (DoubleColumnVector) column, rowId);
- break;
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- readBinaries(num, (BytesColumnVector) column, rowId);
- break;
- case FLOAT:
- readFloats(num, (DoubleColumnVector) column, rowId);
- break;
- case DECIMAL:
- readDecimal(num, (DecimalColumnVector) column, rowId);
- break;
- case INTERVAL_DAY_TIME:
- case TIMESTAMP:
- default:
- throw new IOException("Unsupported type: " + type);
- }
- }
-
- private void readDictionaryIDs(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readValueDictionaryId();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readIntegers(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readInteger();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readDoubles(
- int total,
- DoubleColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readDouble();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readBooleans(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readLongs(
- int total,
- LongColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readLong();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readFloats(
- int total,
- DoubleColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId] = dataColumn.readFloat();
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readDecimal(
- int total,
- DecimalColumnVector c,
- int rowId) throws IOException {
- int left = total;
- c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
- c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
- c.isNull[rowId] = false;
- c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- private void readBinaries(
- int total,
- BytesColumnVector c,
- int rowId) throws IOException {
- int left = total;
- while (left > 0) {
- readRepetitionAndDefinitionLevels();
- if (definitionLevel >= maxDefLevel) {
- c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
- c.isNull[rowId] = false;
- // TODO figure out a better way to set repeat for Binary type
- c.isRepeating = false;
- } else {
- c.isNull[rowId] = true;
- c.isRepeating = false;
- c.noNulls = false;
- }
- rowId++;
- left--;
- }
- }
-
- /**
- * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
- */
- private void decodeDictionaryIds(
- int rowId,
- int num,
- ColumnVector column,
- LongColumnVector dictionaryIds) {
- System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
- if (column.noNulls) {
- column.noNulls = dictionaryIds.noNulls;
- }
- column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
-
- switch (descriptor.getType()) {
- case INT32:
- for (int i = rowId; i < rowId + num; ++i) {
- ((LongColumnVector) column).vector[i] =
- dictionary.decodeToInt((int) dictionaryIds.vector[i]);
- }
- break;
- case INT64:
- for (int i = rowId; i < rowId + num; ++i) {
- ((LongColumnVector) column).vector[i] =
- dictionary.decodeToLong((int) dictionaryIds.vector[i]);
- }
- break;
- case FLOAT:
- for (int i = rowId; i < rowId + num; ++i) {
- ((DoubleColumnVector) column).vector[i] =
- dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
- }
- break;
- case DOUBLE:
- for (int i = rowId; i < rowId + num; ++i) {
- ((DoubleColumnVector) column).vector[i] =
- dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
- }
- break;
- case INT96:
- for (int i = rowId; i < rowId + num; ++i) {
- ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
- buf.order(ByteOrder.LITTLE_ENDIAN);
- long timeOfDayNanos = buf.getLong();
- int julianDay = buf.getInt();
- NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
- Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
- ((TimestampColumnVector) column).set(i, ts);
- }
- break;
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- if (column instanceof BytesColumnVector) {
- for (int i = rowId; i < rowId + num; ++i) {
- ((BytesColumnVector) column)
- .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
- }
- } else {
- DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
- decimalColumnVector.precision =
- (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
- decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
- for (int i = rowId; i < rowId + num; ++i) {
- decimalColumnVector.vector[i]
- .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(),
- decimalColumnVector.scale);
- }
- }
- break;
- default:
- throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
- }
- }
-
- private void readRepetitionAndDefinitionLevels() {
- repetitionLevel = repetitionLevelColumn.nextInt();
- definitionLevel = definitionLevelColumn.nextInt();
- valuesRead++;
- }
-
- private void readPage() throws IOException {
- DataPage page = pageReader.readPage();
- // TODO: Why is this a visitor?
- page.accept(new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- readPageV1(dataPageV1);
- return null;
- }
-
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- readPageV2(dataPageV2);
- return null;
- }
- });
- }
-
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
- this.pageValueCount = valueCount;
- this.endOfPageValueCount = valuesRead + pageValueCount;
- if (dataEncoding.usesDictionary()) {
- this.dataColumn = null;
- if (dictionary == null) {
- throw new IOException(
- "could not read page in col " + descriptor +
- " as the dictionary was missing for encoding " + dataEncoding);
- }
- dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
- this.isCurrentPageDictionaryEncoded = true;
- } else {
- if (dataEncoding != Encoding.PLAIN) {
- throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
- }
- dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
- this.isCurrentPageDictionaryEncoded = false;
- }
-
- try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
- } catch (IOException e) {
- throw new IOException("could not read page in col " + descriptor, e);
- }
- }
-
- private void readPageV1(DataPageV1 page) {
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
- ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
- this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
- this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
- try {
- byte[] bytes = page.getBytes().toByteArray();
- LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
- LOG.debug("reading repetition levels at 0");
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- LOG.debug("reading definition levels at " + next);
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- LOG.debug("reading data at " + next);
- initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private void readPageV2(DataPageV2 page) {
- this.pageValueCount = page.getValueCount();
- this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
- page.getRepetitionLevels());
- this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
- try {
- LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
- try {
- if (maxLevel == 0) {
- return new NullIntIterator();
- }
- return new RLEIntIterator(
- new RunLengthBitPackingHybridDecoder(
- BytesUtils.getWidthFromMaxInt(maxLevel),
- new ByteArrayInputStream(bytes.toByteArray())));
- } catch (IOException e) {
- throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
- }
- }
-
- /**
- * Utility classes to abstract over different way to read ints with different encodings.
- * TODO: remove this layer of abstraction?
- */
- abstract static class IntIterator {
- abstract int nextInt();
- }
-
- protected static final class ValuesReaderIntIterator extends IntIterator {
- ValuesReader delegate;
-
- public ValuesReaderIntIterator(ValuesReader delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- return delegate.readInteger();
- }
- }
-
- protected static final class RLEIntIterator extends IntIterator {
- RunLengthBitPackingHybridDecoder delegate;
-
- public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
- this.delegate = delegate;
- }
-
- @Override
- int nextInt() {
- try {
- return delegate.readInt();
- } catch (IOException e) {
- throw new ParquetDecodingException(e);
- }
- }
- }
-
- protected static final class NullIntIterator extends IntIterator {
- @Override
- int nextInt() { return 0; }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
deleted file mode 100644
index cc6cb20..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.parquet.vector;
-
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import java.io.IOException;
-import java.util.List;
-
-public class VectorizedStructColumnReader implements VectorizedColumnReader {
-
- private final List<VectorizedColumnReader> fieldReaders;
-
- public VectorizedStructColumnReader(List<VectorizedColumnReader> fieldReaders) {
- this.fieldReaders = fieldReaders;
- }
-
- @Override
- public void readBatch(
- int total,
- ColumnVector column,
- TypeInfo columnType) throws IOException {
- StructColumnVector structColumnVector = (StructColumnVector) column;
- StructTypeInfo structTypeInfo = (StructTypeInfo) columnType;
- ColumnVector[] vectors = structColumnVector.fields;
- for (int i = 0; i < vectors.length; i++) {
- fieldReaders.get(i)
- .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i));
- structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating;
-
- for (int j = 0; j < vectors[i].isNull.length; j++) {
- structColumnVector.isNull[j] =
- (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j];
- }
- structColumnVector.noNulls =
- (i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index d4b4140..276ff19 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -1,13 +1,9 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,74 +14,416 @@
package org.apache.hadoop.hive.ql.io.parquet;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.util.List;
+import java.util.Random;
-public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase {
- static boolean isDictionaryEncoding = false;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
- @BeforeClass
- public static void setup() throws IOException {
- removeFile();
- writeData(initWriterFromFile(), isDictionaryEncoding);
- }
+public class TestVectorizedColumnReader {
+
+ private static final int nElements = 2500;
+ protected static final Configuration conf = new Configuration();
+ protected static final Path file =
+ new Path("target/test/TestParquetVectorReader/testParquetFile");
+ private static String[] uniqueStrs = new String[nElements];
+ private static boolean[] isNulls = new boolean[nElements];
+ private static Random random = new Random();
+ protected static final MessageType schema = parseMessageType(
+ "message test { "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required int96 int96_field; "
+ + "required double double_field; "
+ + "required float float_field; "
+ + "required boolean boolean_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "optional fixed_len_byte_array(1) some_null_field; "
+ + "optional fixed_len_byte_array(1) all_null_field; "
+ + "optional binary binary_field; "
+ + "optional binary binary_field_non_repeating; "
+ + "} ");
@AfterClass
public static void cleanup() throws IOException {
- removeFile();
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(file)) {
+ fs.delete(file, true);
+ }
}
- @Test
- public void testIntRead() throws Exception {
- intRead(isDictionaryEncoding);
+ @BeforeClass
+ public static void prepareFile() throws IOException {
+ cleanup();
+
+ boolean dictionaryEnabled = true;
+ boolean validating = false;
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(
+ file,
+ new GroupWriteSupport(),
+ GZIP, 1024*1024, 1024, 1024*1024,
+ dictionaryEnabled, validating, PARQUET_1_0, conf);
+ writeData(f, writer);
}
- @Test
- public void testLongRead() throws Exception {
- longRead(isDictionaryEncoding);
+ protected static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
+ initialStrings(uniqueStrs);
+ for (int i = 0; i < nElements; i++) {
+ Group group = f.newGroup()
+ .append("int32_field", i)
+ .append("int64_field", (long) 2 * i)
+ .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes()))
+ .append("double_field", i * 1.0)
+ .append("float_field", ((float) (i * 2.0)))
+ .append("boolean_field", i % 5 == 0)
+ .append("flba_field", "abc");
+
+ if (i % 2 == 1) {
+ group.append("some_null_field", "x");
+ }
+
+ if (i % 13 != 1) {
+ int binaryLen = i % 10;
+ group.append("binary_field",
+ Binary.fromString(new String(new char[binaryLen]).replace("\0", "x")));
+ }
+
+ if (uniqueStrs[i] != null) {
+ group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i]));
+ }
+ writer.write(group);
+ }
+ writer.close();
}
- @Test
- public void testDoubleRead() throws Exception {
- doubleRead(isDictionaryEncoding);
+ private static String getRandomStr() {
+ int len = random.nextInt(10);
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < len; i++) {
+ sb.append((char) ('a' + random.nextInt(25)));
+ }
+ return sb.toString();
+ }
+
+ public static void initialStrings(String[] uniqueStrs) {
+ for (int i = 0; i < uniqueStrs.length; i++) {
+ String str = getRandomStr();
+ if (!str.isEmpty()) {
+ uniqueStrs[i] = str;
+ isNulls[i] = false;
+ }else{
+ isNulls[i] = true;
+ }
+ }
+ }
+
+ private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
+ throws IOException, InterruptedException, HiveException {
+ conf.set(PARQUET_READ_SCHEMA, schemaString);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+
+ Job vectorJob = new Job(conf, "read vector");
+ ParquetInputFormat.setInputPaths(vectorJob, file);
+ ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
+ InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
+ initialVectorizedRowBatchCtx(conf);
+ return new VectorizedParquetRecordReader(split, new JobConf(conf));
+ }
+
+ private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
+ MapWork mapWork = new MapWork();
+ VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.init(createStructObjectInspector(conf), new String[0]);
+ mapWork.setVectorMode(true);
+ mapWork.setVectorizedRowBatchCtx(rbCtx);
+ Utilities.setMapWork(conf, mapWork);
+ }
+
+ private StructObjectInspector createStructObjectInspector(Configuration conf) {
+ // Create row related objects
+ String columnNames = conf.get(IOConstants.COLUMNS);
+ List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+ String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
+ List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+ TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
+ return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
}
@Test
- public void testFloatRead() throws Exception {
- floatRead(isDictionaryEncoding);
+ public void testIntRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"int32_field");
+ conf.set(IOConstants.COLUMNS_TYPES,"int");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required int32 int32_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals(c, vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
}
@Test
- public void testBooleanRead() throws Exception {
- booleanRead();
+ public void testLongRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"int64_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "bigint");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required int64 int64_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals(2 * c, vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
}
@Test
- public void testBinaryRead() throws Exception {
- binaryRead(isDictionaryEncoding);
+ public void testDoubleRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"double_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "double");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required double double_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals(1.0 * c, vector.vector[i], 0);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
}
@Test
- public void testStructRead() throws Exception {
- structRead(isDictionaryEncoding);
+ public void testFloatRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"float_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "float");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required float float_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals((float)2.0 * c, vector.vector[i], 0);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
}
@Test
- public void testNestedStructRead() throws Exception {
- nestedStructRead0(isDictionaryEncoding);
- nestedStructRead1(isDictionaryEncoding);
+ public void testBooleanRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"boolean_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "boolean");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required boolean boolean_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ int e = (c % 5 == 0) ? 1 : 0;
+ assertEquals(e, vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
}
@Test
- public void structReadSomeNull() throws Exception {
- structReadSomeNull(isDictionaryEncoding);
+ public void testBinaryReadDictionaryEncoding() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"binary_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "string");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required binary binary_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+ boolean noNull = true;
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ if (c % 13 == 1) {
+ assertTrue(vector.isNull[i]);
+ } else {
+ assertFalse(vector.isNull[i]);
+ int binaryLen = c % 10;
+ String expected = new String(new char[binaryLen]).replace("\0", "x");
+ String actual = new String(ArrayUtils
+ .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+ assertEquals("Failed at " + c, expected, actual);
+ noNull = false;
+ }
+ c++;
+ }
+ assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+ assertFalse(vector.isRepeating);
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
}
@Test
- public void decimalRead() throws Exception {
- decimalRead(isDictionaryEncoding);
+ public void testBinaryRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"binary_field_non_repeating");
+ conf.set(IOConstants.COLUMNS_TYPES, "string");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required binary binary_field_non_repeating;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+ boolean noNull = true;
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ String actual;
+ assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]);
+ if (!vector.isNull[i]) {
+ actual = new String(ArrayUtils
+ .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+ assertEquals("failed at " + c, uniqueStrs[c], actual);
+ }else{
+ noNull = false;
+ }
+ c++;
+ }
+ assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+ assertFalse(vector.isRepeating);
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
}
}