You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/11/18 07:52:36 UTC
[3/4] hive git commit: HIVE-14815: Implement Parquet vectorization
reader for Primitive types(Ferdinand Xu, review by Chao Sun) This closes #104
http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
new file mode 100644
index 0000000..f94c49a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * This reader is used to read a batch of record from inputsplit, part of the code is referred
+ * from Apache Spark and Apache Parquet.
+ */
+public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
+ implements RecordReader<NullWritable, VectorizedRowBatch> {
+ public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);
+
+ private List<Integer> colsToInclude;
+
+ protected MessageType fileSchema;
+ protected MessageType requestedSchema;
+ private List<String> columnNamesList;
+ private List<TypeInfo> columnTypesList;
+ private VectorizedRowBatchCtx rbCtx;
+
+ /**
+ * For each request column, the reader to read this column. This is NULL if this column
+ * is missing from the file, in which case we populate the attribute with NULL.
+ */
+ private VectorizedColumnReader[] columnReaders;
+
+ /**
+ * The number of rows that have been returned.
+ */
+ private long rowsReturned;
+
+ /**
+ * The number of rows that have been reading, including the current in flight row group.
+ */
+ private long totalCountLoadedSoFar = 0;
+
+ /**
+ * The total number of rows this RecordReader will eventually read. The sum of the
+ * rows of all the row groups.
+ */
+ protected long totalRowCount;
+
+ @VisibleForTesting
+ public VectorizedParquetRecordReader(
+ InputSplit inputSplit,
+ JobConf conf) {
+ try {
+ serDeStats = new SerDeStats();
+ projectionPusher = new ProjectionPusher();
+ initialize(inputSplit, conf);
+ colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+ } catch (Throwable e) {
+ LOG.error("Failed to create the vectorized reader due to exception " + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public VectorizedParquetRecordReader(
+ org.apache.hadoop.mapred.InputSplit oldInputSplit,
+ JobConf conf) {
+ try {
+ serDeStats = new SerDeStats();
+ projectionPusher = new ProjectionPusher();
+ initialize(getSplit(oldInputSplit, conf), conf);
+ colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+ } catch (Throwable e) {
+ LOG.error("Failed to create the vectorized reader due to exception " + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void initialize(
+ InputSplit oldSplit,
+ JobConf configuration) throws IOException, InterruptedException {
+ jobConf = configuration;
+ ParquetMetadata footer;
+ List<BlockMetaData> blocks;
+ ParquetInputSplit split = (ParquetInputSplit) oldSplit;
+ boolean indexAccess =
+ configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false);
+ this.file = split.getPath();
+ long[] rowGroupOffsets = split.getRowGroupOffsets();
+
+ String columnNames = configuration.get(IOConstants.COLUMNS);
+ columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+ String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
+ columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+
+ // if task.side.metadata is set, rowGroupOffsets is null
+ if (rowGroupOffsets == null) {
+ //TODO check whether rowGroupOffSets can be null
+ // then we need to apply the predicate push down filter
+ footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
+ MessageType fileSchema = footer.getFileMetaData().getSchema();
+ FilterCompat.Filter filter = getFilter(configuration);
+ blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+ } else {
+ // otherwise we find the row groups that were selected on the client
+ footer = readFooter(configuration, file, NO_FILTER);
+ Set<Long> offsets = new HashSet<>();
+ for (long offset : rowGroupOffsets) {
+ offsets.add(offset);
+ }
+ blocks = new ArrayList<>();
+ for (BlockMetaData block : footer.getBlocks()) {
+ if (offsets.contains(block.getStartingPos())) {
+ blocks.add(block);
+ }
+ }
+ // verify we found them all
+ if (blocks.size() != rowGroupOffsets.length) {
+ long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+ for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+ foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+ }
+ // this should never happen.
+ // provide a good error message in case there's a bug
+ throw new IllegalStateException(
+ "All the offsets listed in the split should be found in the file."
+ + " expected: " + Arrays.toString(rowGroupOffsets)
+ + " found: " + blocks
+ + " out of: " + Arrays.toString(foundRowGroupOffsets)
+ + " in range " + split.getStart() + ", " + split.getEnd());
+ }
+ }
+
+ for (BlockMetaData block : blocks) {
+ this.totalRowCount += block.getRowCount();
+ }
+ this.fileSchema = footer.getFileMetaData().getSchema();
+
+ MessageType tableSchema;
+ if (indexAccess) {
+ List<Integer> indexSequence = new ArrayList<>();
+
+ // Generates a sequence list of indexes
+ for(int i = 0; i < columnNamesList.size(); i++) {
+ indexSequence.add(i);
+ }
+
+ tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList,
+ indexSequence);
+ } else {
+ tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList,
+ columnTypesList);
+ }
+
+ List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+ if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
+ requestedSchema =
+ DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
+ } else {
+ requestedSchema = fileSchema;
+ }
+
+ this.reader = new ParquetFileReader(
+ configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
+ }
+
+ @Override
+ public boolean next(
+ NullWritable nullWritable,
+ VectorizedRowBatch vectorizedRowBatch) throws IOException {
+ return nextBatch(vectorizedRowBatch);
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ return rbCtx.createVectorizedRowBatch();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ //TODO
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ //TODO
+ return 0;
+ }
+
+ /**
+ * Advances to the next batch of rows. Returns false if there are no more.
+ */
+ private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
+ columnarBatch.reset();
+ if (rowsReturned >= totalRowCount) {
+ return false;
+ }
+ checkEndOfRowGroup();
+
+ int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned);
+ for (int i = 0; i < columnReaders.length; ++i) {
+ if (columnReaders[i] == null) {
+ continue;
+ }
+ columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
+ columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
+ columnTypesList.get(colsToInclude.get(i)));
+ }
+ rowsReturned += num;
+ columnarBatch.size = num;
+ return true;
+ }
+
+ private void checkEndOfRowGroup() throws IOException {
+ if (rowsReturned != totalCountLoadedSoFar) {
+ return;
+ }
+ PageReadStore pages = reader.readNextRowGroup();
+ if (pages == null) {
+ throw new IOException("expecting more rows but reached last block. Read "
+ + rowsReturned + " out of " + totalRowCount);
+ }
+ List<ColumnDescriptor> columns = requestedSchema.getColumns();
+ List<Type> types = requestedSchema.getFields();
+ columnReaders = new VectorizedColumnReader[columns.size()];
+ for (int i = 0; i < columns.size(); ++i) {
+ columnReaders[i] =
+ new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)),
+ skipTimestampConversion, types.get(i));
+ }
+ totalCountLoadedSoFar += pages.getRowCount();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
new file mode 100644
index 0000000..276ff19
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+
+public class TestVectorizedColumnReader {
+
+ private static final int nElements = 2500;
+ protected static final Configuration conf = new Configuration();
+ protected static final Path file =
+ new Path("target/test/TestParquetVectorReader/testParquetFile");
+ private static String[] uniqueStrs = new String[nElements];
+ private static boolean[] isNulls = new boolean[nElements];
+ private static Random random = new Random();
+ protected static final MessageType schema = parseMessageType(
+ "message test { "
+ + "required int32 int32_field; "
+ + "required int64 int64_field; "
+ + "required int96 int96_field; "
+ + "required double double_field; "
+ + "required float float_field; "
+ + "required boolean boolean_field; "
+ + "required fixed_len_byte_array(3) flba_field; "
+ + "optional fixed_len_byte_array(1) some_null_field; "
+ + "optional fixed_len_byte_array(1) all_null_field; "
+ + "optional binary binary_field; "
+ + "optional binary binary_field_non_repeating; "
+ + "} ");
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ if (fs.exists(file)) {
+ fs.delete(file, true);
+ }
+ }
+
+ @BeforeClass
+ public static void prepareFile() throws IOException {
+ cleanup();
+
+ boolean dictionaryEnabled = true;
+ boolean validating = false;
+ GroupWriteSupport.setSchema(schema, conf);
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ ParquetWriter<Group> writer = new ParquetWriter<Group>(
+ file,
+ new GroupWriteSupport(),
+ GZIP, 1024*1024, 1024, 1024*1024,
+ dictionaryEnabled, validating, PARQUET_1_0, conf);
+ writeData(f, writer);
+ }
+
+ protected static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
+ initialStrings(uniqueStrs);
+ for (int i = 0; i < nElements; i++) {
+ Group group = f.newGroup()
+ .append("int32_field", i)
+ .append("int64_field", (long) 2 * i)
+ .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes()))
+ .append("double_field", i * 1.0)
+ .append("float_field", ((float) (i * 2.0)))
+ .append("boolean_field", i % 5 == 0)
+ .append("flba_field", "abc");
+
+ if (i % 2 == 1) {
+ group.append("some_null_field", "x");
+ }
+
+ if (i % 13 != 1) {
+ int binaryLen = i % 10;
+ group.append("binary_field",
+ Binary.fromString(new String(new char[binaryLen]).replace("\0", "x")));
+ }
+
+ if (uniqueStrs[i] != null) {
+ group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i]));
+ }
+ writer.write(group);
+ }
+ writer.close();
+ }
+
+ private static String getRandomStr() {
+ int len = random.nextInt(10);
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < len; i++) {
+ sb.append((char) ('a' + random.nextInt(25)));
+ }
+ return sb.toString();
+ }
+
+ public static void initialStrings(String[] uniqueStrs) {
+ for (int i = 0; i < uniqueStrs.length; i++) {
+ String str = getRandomStr();
+ if (!str.isEmpty()) {
+ uniqueStrs[i] = str;
+ isNulls[i] = false;
+ }else{
+ isNulls[i] = true;
+ }
+ }
+ }
+
+ private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
+ throws IOException, InterruptedException, HiveException {
+ conf.set(PARQUET_READ_SCHEMA, schemaString);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+
+ Job vectorJob = new Job(conf, "read vector");
+ ParquetInputFormat.setInputPaths(vectorJob, file);
+ ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
+ InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
+ initialVectorizedRowBatchCtx(conf);
+ return new VectorizedParquetRecordReader(split, new JobConf(conf));
+ }
+
+ private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
+ MapWork mapWork = new MapWork();
+ VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.init(createStructObjectInspector(conf), new String[0]);
+ mapWork.setVectorMode(true);
+ mapWork.setVectorizedRowBatchCtx(rbCtx);
+ Utilities.setMapWork(conf, mapWork);
+ }
+
+ private StructObjectInspector createStructObjectInspector(Configuration conf) {
+ // Create row related objects
+ String columnNames = conf.get(IOConstants.COLUMNS);
+ List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+ String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
+ List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+ TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
+ return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
+ }
+
+ @Test
+ public void testIntRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"int32_field");
+ conf.set(IOConstants.COLUMNS_TYPES,"int");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required int32 int32_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals(c, vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testLongRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"int64_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "bigint");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required int64 int64_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals(2 * c, vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testDoubleRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"double_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "double");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required double double_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals(1.0 * c, vector.vector[i], 0);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testFloatRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"float_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "float");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required float float_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ assertEquals((float)2.0 * c, vector.vector[i], 0);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testBooleanRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"boolean_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "boolean");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required boolean boolean_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ try {
+ long c = 0;
+ while (reader.next(NullWritable.get(), previous)) {
+ LongColumnVector vector = (LongColumnVector) previous.cols[0];
+ assertTrue(vector.noNulls);
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ int e = (c % 5 == 0) ? 1 : 0;
+ assertEquals(e, vector.vector[i]);
+ assertFalse(vector.isNull[i]);
+ c++;
+ }
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testBinaryReadDictionaryEncoding() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"binary_field");
+ conf.set(IOConstants.COLUMNS_TYPES, "string");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required binary binary_field;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+ boolean noNull = true;
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ if (c % 13 == 1) {
+ assertTrue(vector.isNull[i]);
+ } else {
+ assertFalse(vector.isNull[i]);
+ int binaryLen = c % 10;
+ String expected = new String(new char[binaryLen]).replace("\0", "x");
+ String actual = new String(ArrayUtils
+ .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+ assertEquals("Failed at " + c, expected, actual);
+ noNull = false;
+ }
+ c++;
+ }
+ assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+ assertFalse(vector.isRepeating);
+ }
+ assertEquals(nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testBinaryRead() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(IOConstants.COLUMNS,"binary_field_non_repeating");
+ conf.set(IOConstants.COLUMNS_TYPES, "string");
+ conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+ VectorizedParquetRecordReader reader =
+ createParquetReader("message test { required binary binary_field_non_repeating;}", conf);
+ VectorizedRowBatch previous = reader.createValue();
+ int c = 0;
+ try {
+ while (reader.next(NullWritable.get(), previous)) {
+ BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+ boolean noNull = true;
+ for (int i = 0; i < vector.vector.length; i++) {
+ if(c == nElements){
+ break;
+ }
+ String actual;
+ assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]);
+ if (!vector.isNull[i]) {
+ actual = new String(ArrayUtils
+ .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+ assertEquals("failed at " + c, uniqueStrs[c], actual);
+ }else{
+ noNull = false;
+ }
+ c++;
+ }
+ assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+ assertFalse(vector.isRepeating);
+ }
+ assertEquals("It doesn't exit at expected position", nElements, c);
+ } finally {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q
new file mode 100644
index 0000000..7de444f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q
@@ -0,0 +1,94 @@
+set hive.mapred.mode=nonstrict;
+DROP TABLE parquet_types_staging;
+DROP TABLE parquet_types;
+
+set hive.vectorized.execution.enabled=true;
+set hive.vectorized.execution.reduce.enabled=true;
+set hive.vectorized.use.row.serde.deserialize=true;
+set hive.vectorized.use.vector.serde.deserialize=true;
+set hive.vectorized.execution.reduce.groupby.enabled = true;
+
+CREATE TABLE parquet_types_staging (
+ cint int,
+ ctinyint tinyint,
+ csmallint smallint,
+ cfloat float,
+ cdouble double,
+ cstring1 string,
+ t timestamp,
+ cchar char(5),
+ cvarchar varchar(10),
+ cbinary string,
+ m1 map<string, varchar(3)>,
+ l1 array<int>,
+ st1 struct<c1:int, c2:char(1)>,
+ d date
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':';
+
+CREATE TABLE parquet_types (
+ cint int,
+ ctinyint tinyint,
+ csmallint smallint,
+ cfloat float,
+ cdouble double,
+ cstring1 string,
+ t timestamp,
+ cchar char(5),
+ cvarchar varchar(10),
+ cbinary binary,
+ m1 map<string, varchar(3)>,
+ l1 array<int>,
+ st1 struct<c1:int, c2:char(1)>,
+ d date
+) STORED AS PARQUET;
+
+LOAD DATA LOCAL INPATH '../../data/files/parquet_non_dictionary_types.txt' OVERWRITE INTO TABLE
+parquet_types_staging;
+
+SELECT * FROM parquet_types_staging;
+
+INSERT OVERWRITE TABLE parquet_types
+SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar,
+unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging;
+
+-- test types in group by
+
+EXPLAIN SELECT ctinyint,
+ MAX(cint),
+ MIN(csmallint),
+ COUNT(cstring1),
+ ROUND(AVG(cfloat), 5),
+ ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+SELECT ctinyint,
+ MAX(cint),
+ MIN(csmallint),
+ COUNT(cstring1),
+ ROUND(AVG(cfloat), 5),
+ ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+EXPLAIN SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+
+EXPLAIN SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+
+EXPLAIN SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+
+EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+
+EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
+SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/queries/clientpositive/parquet_types_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_types_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_vectorization.q
new file mode 100644
index 0000000..bb0e5b2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_types_vectorization.q
@@ -0,0 +1,96 @@
+set hive.mapred.mode=nonstrict;
+DROP TABLE parquet_types_staging;
+DROP TABLE parquet_types;
+
+set hive.vectorized.execution.enabled=true;
+set hive.vectorized.execution.reduce.enabled=true;
+set hive.vectorized.use.row.serde.deserialize=true;
+set hive.vectorized.use.vector.serde.deserialize=true;
+set hive.vectorized.execution.reduce.groupby.enabled = true;
+
+CREATE TABLE parquet_types_staging (
+ cint int,
+ ctinyint tinyint,
+ csmallint smallint,
+ cfloat float,
+ cdouble double,
+ cstring1 string,
+ t timestamp,
+ cchar char(5),
+ cvarchar varchar(10),
+ cbinary string,
+ m1 map<string, varchar(3)>,
+ l1 array<int>,
+ st1 struct<c1:int, c2:char(1)>,
+ d date
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':';
+
+CREATE TABLE parquet_types (
+ cint int,
+ ctinyint tinyint,
+ csmallint smallint,
+ cfloat float,
+ cdouble double,
+ cstring1 string,
+ t timestamp,
+ cchar char(5),
+ cvarchar varchar(10),
+ cbinary binary,
+ m1 map<string, varchar(3)>,
+ l1 array<int>,
+ st1 struct<c1:int, c2:char(1)>,
+ d date
+) STORED AS PARQUET;
+
+LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging;
+
+SELECT * FROM parquet_types_staging;
+
+INSERT OVERWRITE TABLE parquet_types
+SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar,
+unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging;
+
+-- test types in group by
+
+EXPLAIN SELECT ctinyint,
+ MAX(cint),
+ MIN(csmallint),
+ COUNT(cstring1),
+ ROUND(AVG(cfloat), 5),
+ ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+SELECT ctinyint,
+ MAX(cint),
+ MIN(csmallint),
+ COUNT(cstring1),
+ ROUND(AVG(cfloat), 5),
+ ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+EXPLAIN SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+
+EXPLAIN SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+
+EXPLAIN SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+
+EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+
+EXPLAIN SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t;
+SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t;
+
+EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
+SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out b/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
index 8345132..e42453d 100644
--- a/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
+++ b/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
@@ -150,7 +150,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: tinyint)
Statistics: Num rows: 12288 Data size: 73728 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: int), _col2 (type: smallint), _col3 (type: bigint), _col4 (type: struct<count:bigint,sum:double,input:float>), _col5 (type: struct<count:bigint,sum:double,variance:double>)
- Execution mode: llap
+ Execution mode: vectorized, llap
LLAP IO: no inputs
Reducer 2
Execution mode: llap
http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out b/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
index b49d5dd..0524cb3 100644
--- a/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
+++ b/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
@@ -250,19 +250,19 @@ Stage-0
limit:-1
Stage-1
Reducer 3 vectorized, llap
- File Output Operator [FS_10]
- Select Operator [SEL_9] (rows=11 width=11)
+ File Output Operator [FS_12]
+ Select Operator [SEL_11] (rows=11 width=11)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"]
<-Reducer 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_6]
Group By Operator [GBY_4] (rows=11 width=11)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(VALUE._col0)","min(VALUE._col1)","count(VALUE._col2)","avg(VALUE._col3)","stddev_pop(VALUE._col4)","max(VALUE._col5)"],keys:KEY._col0
- <-Map 1 [SIMPLE_EDGE] llap
+ <-Map 1 [SIMPLE_EDGE] vectorized, llap
SHUFFLE [RS_3]
PartitionCols:_col0
- Group By Operator [GBY_2] (rows=22 width=11)
+ Group By Operator [GBY_10] (rows=22 width=11)
Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(cint)","min(csmallint)","count(cstring1)","avg(cfloat)","stddev_pop(cdouble)","max(cdecimal)"],keys:ctinyint
- Select Operator [SEL_1] (rows=22 width=11)
+ Select Operator [SEL_9] (rows=22 width=11)
Output:["ctinyint","cint","csmallint","cstring1","cfloat","cdouble","cdecimal"]
TableScan [TS_0] (rows=22 width=11)
default@parquet_types,parquet_types,Tbl:COMPLETE,Col:NONE,Output:["cint","ctinyint","csmallint","cfloat","cdouble","cstring1","cdecimal"]