You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/11/18 07:52:36 UTC

[3/4] hive git commit: HIVE-14815: Implement Parquet vectorization reader for Primitive types(Ferdinand Xu, review by Chao Sun) This closes #104

http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
new file mode 100644
index 0000000..f94c49a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * This reader is used to read a batch of record from inputsplit, part of the code is referred
+ * from Apache Spark and Apache Parquet.
+ */
+public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
+  implements RecordReader<NullWritable, VectorizedRowBatch> {
+  public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class);
+
+  private List<Integer> colsToInclude;
+
+  protected MessageType fileSchema;
+  protected MessageType requestedSchema;
+  private List<String> columnNamesList;
+  private List<TypeInfo> columnTypesList;
+  private VectorizedRowBatchCtx rbCtx;
+
+  /**
+   * For each request column, the reader to read this column. This is NULL if this column
+   * is missing from the file, in which case we populate the attribute with NULL.
+   */
+  private VectorizedColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * The total number of rows this RecordReader will eventually read. The sum of the
+   * rows of all the row groups.
+   */
+  protected long totalRowCount;
+
+  @VisibleForTesting
+  public VectorizedParquetRecordReader(
+    InputSplit inputSplit,
+    JobConf conf) {
+    try {
+      serDeStats = new SerDeStats();
+      projectionPusher = new ProjectionPusher();
+      initialize(inputSplit, conf);
+      colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+    } catch (Throwable e) {
+      LOG.error("Failed to create the vectorized reader due to exception " + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public VectorizedParquetRecordReader(
+    org.apache.hadoop.mapred.InputSplit oldInputSplit,
+    JobConf conf) {
+    try {
+      serDeStats = new SerDeStats();
+      projectionPusher = new ProjectionPusher();
+      initialize(getSplit(oldInputSplit, conf), conf);
+      colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
+      rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+    } catch (Throwable e) {
+      LOG.error("Failed to create the vectorized reader due to exception " + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void initialize(
+    InputSplit oldSplit,
+    JobConf configuration) throws IOException, InterruptedException {
+    jobConf = configuration;
+    ParquetMetadata footer;
+    List<BlockMetaData> blocks;
+    ParquetInputSplit split = (ParquetInputSplit) oldSplit;
+    boolean indexAccess =
+      configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false);
+    this.file = split.getPath();
+    long[] rowGroupOffsets = split.getRowGroupOffsets();
+
+    String columnNames = configuration.get(IOConstants.COLUMNS);
+    columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+    String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
+    columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+
+    // if task.side.metadata is set, rowGroupOffsets is null
+    if (rowGroupOffsets == null) {
+      //TODO check whether rowGroupOffSets can be null
+      // then we need to apply the predicate push down filter
+      footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
+      MessageType fileSchema = footer.getFileMetaData().getSchema();
+      FilterCompat.Filter filter = getFilter(configuration);
+      blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+    } else {
+      // otherwise we find the row groups that were selected on the client
+      footer = readFooter(configuration, file, NO_FILTER);
+      Set<Long> offsets = new HashSet<>();
+      for (long offset : rowGroupOffsets) {
+        offsets.add(offset);
+      }
+      blocks = new ArrayList<>();
+      for (BlockMetaData block : footer.getBlocks()) {
+        if (offsets.contains(block.getStartingPos())) {
+          blocks.add(block);
+        }
+      }
+      // verify we found them all
+      if (blocks.size() != rowGroupOffsets.length) {
+        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+        }
+        // this should never happen.
+        // provide a good error message in case there's a bug
+        throw new IllegalStateException(
+          "All the offsets listed in the split should be found in the file."
+            + " expected: " + Arrays.toString(rowGroupOffsets)
+            + " found: " + blocks
+            + " out of: " + Arrays.toString(foundRowGroupOffsets)
+            + " in range " + split.getStart() + ", " + split.getEnd());
+      }
+    }
+
+    for (BlockMetaData block : blocks) {
+      this.totalRowCount += block.getRowCount();
+    }
+    this.fileSchema = footer.getFileMetaData().getSchema();
+
+    MessageType tableSchema;
+    if (indexAccess) {
+      List<Integer> indexSequence = new ArrayList<>();
+
+      // Generates a sequence list of indexes
+      for(int i = 0; i < columnNamesList.size(); i++) {
+        indexSequence.add(i);
+      }
+
+      tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList,
+        indexSequence);
+    } else {
+      tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList,
+        columnTypesList);
+    }
+
+    List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+    if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
+      requestedSchema =
+        DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
+    } else {
+      requestedSchema = fileSchema;
+    }
+
+    this.reader = new ParquetFileReader(
+      configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
+  }
+
+  @Override
+  public boolean next(
+    NullWritable nullWritable,
+    VectorizedRowBatch vectorizedRowBatch) throws IOException {
+    return nextBatch(vectorizedRowBatch);
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public VectorizedRowBatch createValue() {
+    return rbCtx.createVectorizedRowBatch();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    //TODO
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    //TODO
+    return 0;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException {
+    columnarBatch.reset();
+    if (rowsReturned >= totalRowCount) {
+      return false;
+    }
+    checkEndOfRowGroup();
+
+    int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned);
+    for (int i = 0; i < columnReaders.length; ++i) {
+      if (columnReaders[i] == null) {
+        continue;
+      }
+      columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
+      columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
+        columnTypesList.get(colsToInclude.get(i)));
+    }
+    rowsReturned += num;
+    columnarBatch.size = num;
+    return true;
+  }
+
+  private void checkEndOfRowGroup() throws IOException {
+    if (rowsReturned != totalCountLoadedSoFar) {
+      return;
+    }
+    PageReadStore pages = reader.readNextRowGroup();
+    if (pages == null) {
+      throw new IOException("expecting more rows but reached last block. Read "
+        + rowsReturned + " out of " + totalRowCount);
+    }
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    List<Type> types = requestedSchema.getFields();
+    columnReaders = new VectorizedColumnReader[columns.size()];
+    for (int i = 0; i < columns.size(); ++i) {
+      columnReaders[i] =
+        new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)),
+          skipTimestampConversion, types.get(i));
+    }
+    totalCountLoadedSoFar += pages.getRowCount();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
new file mode 100644
index 0000000..276ff19
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+
+public class TestVectorizedColumnReader {
+
+  private static final int nElements = 2500;
+  protected static final Configuration conf = new Configuration();
+  protected static final Path file =
+    new Path("target/test/TestParquetVectorReader/testParquetFile");
+  private static String[] uniqueStrs = new String[nElements];
+  private static boolean[] isNulls = new boolean[nElements];
+  private static Random random = new Random();
+  protected static final MessageType schema = parseMessageType(
+    "message test { "
+      + "required int32 int32_field; "
+      + "required int64 int64_field; "
+      + "required int96 int96_field; "
+      + "required double double_field; "
+      + "required float float_field; "
+      + "required boolean boolean_field; "
+      + "required fixed_len_byte_array(3) flba_field; "
+      + "optional fixed_len_byte_array(1) some_null_field; "
+      + "optional fixed_len_byte_array(1) all_null_field; "
+      + "optional binary binary_field; "
+      + "optional binary binary_field_non_repeating; "
+      + "} ");
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    if (fs.exists(file)) {
+      fs.delete(file, true);
+    }
+  }
+
+  @BeforeClass
+  public static void prepareFile() throws IOException {
+    cleanup();
+
+    boolean dictionaryEnabled = true;
+    boolean validating = false;
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(
+      file,
+      new GroupWriteSupport(),
+      GZIP, 1024*1024, 1024, 1024*1024,
+      dictionaryEnabled, validating, PARQUET_1_0, conf);
+    writeData(f, writer);
+  }
+
+  protected static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
+    initialStrings(uniqueStrs);
+    for (int i = 0; i < nElements; i++) {
+      Group group = f.newGroup()
+        .append("int32_field", i)
+        .append("int64_field", (long) 2 * i)
+        .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes()))
+        .append("double_field", i * 1.0)
+        .append("float_field", ((float) (i * 2.0)))
+        .append("boolean_field", i % 5 == 0)
+        .append("flba_field", "abc");
+
+      if (i % 2 == 1) {
+        group.append("some_null_field", "x");
+      }
+
+      if (i % 13 != 1) {
+        int binaryLen = i % 10;
+        group.append("binary_field",
+          Binary.fromString(new String(new char[binaryLen]).replace("\0", "x")));
+      }
+
+      if (uniqueStrs[i] != null) {
+        group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i]));
+      }
+      writer.write(group);
+    }
+    writer.close();
+  }
+
+  private static String getRandomStr() {
+    int len = random.nextInt(10);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < len; i++) {
+      sb.append((char) ('a' + random.nextInt(25)));
+    }
+    return sb.toString();
+  }
+
+  public static void initialStrings(String[] uniqueStrs) {
+    for (int i = 0; i < uniqueStrs.length; i++) {
+      String str = getRandomStr();
+      if (!str.isEmpty()) {
+        uniqueStrs[i] = str;
+        isNulls[i] = false;
+      }else{
+        isNulls[i] = true;
+      }
+    }
+  }
+
+  private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
+    throws IOException, InterruptedException, HiveException {
+    conf.set(PARQUET_READ_SCHEMA, schemaString);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+
+    Job vectorJob = new Job(conf, "read vector");
+    ParquetInputFormat.setInputPaths(vectorJob, file);
+    ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
+    InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
+    initialVectorizedRowBatchCtx(conf);
+    return new VectorizedParquetRecordReader(split, new JobConf(conf));
+  }
+
+  private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
+    MapWork mapWork = new MapWork();
+    VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+    rbCtx.init(createStructObjectInspector(conf), new String[0]);
+    mapWork.setVectorMode(true);
+    mapWork.setVectorizedRowBatchCtx(rbCtx);
+    Utilities.setMapWork(conf, mapWork);
+  }
+
+  private StructObjectInspector createStructObjectInspector(Configuration conf) {
+    // Create row related objects
+    String columnNames = conf.get(IOConstants.COLUMNS);
+    List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+    String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
+    List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+    TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
+    return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
+  }
+
+  @Test
+  public void testIntRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"int32_field");
+    conf.set(IOConstants.COLUMNS_TYPES,"int");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required int32 int32_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        LongColumnVector vector = (LongColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals(c, vector.vector[i]);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testLongRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"int64_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "bigint");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required int64 int64_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        LongColumnVector vector = (LongColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals(2 * c, vector.vector[i]);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testDoubleRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"double_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "double");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required double double_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals(1.0 * c, vector.vector[i], 0);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testFloatRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"float_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "float");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required float float_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals((float)2.0 * c, vector.vector[i], 0);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testBooleanRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"boolean_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "boolean");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required boolean boolean_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        LongColumnVector vector = (LongColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          int e = (c % 5 == 0) ? 1 : 0;
+          assertEquals(e, vector.vector[i]);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testBinaryReadDictionaryEncoding() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"binary_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "string");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required binary binary_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int c = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+        boolean noNull = true;
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          if (c % 13 == 1) {
+            assertTrue(vector.isNull[i]);
+          } else {
+            assertFalse(vector.isNull[i]);
+            int binaryLen = c % 10;
+            String expected = new String(new char[binaryLen]).replace("\0", "x");
+            String actual = new String(ArrayUtils
+              .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+            assertEquals("Failed at " + c, expected, actual);
+            noNull = false;
+          }
+          c++;
+        }
+        assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+        assertFalse(vector.isRepeating);
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testBinaryRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"binary_field_non_repeating");
+    conf.set(IOConstants.COLUMNS_TYPES, "string");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required binary binary_field_non_repeating;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int c = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+        boolean noNull = true;
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          String actual;
+          assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]);
+          if (!vector.isNull[i]) {
+            actual = new String(ArrayUtils
+              .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+            assertEquals("failed at " + c, uniqueStrs[c], actual);
+          }else{
+            noNull = false;
+          }
+          c++;
+        }
+        assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+        assertFalse(vector.isRepeating);
+      }
+      assertEquals("It doesn't exit at expected position", nElements, c);
+    } finally {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q
new file mode 100644
index 0000000..7de444f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q
@@ -0,0 +1,94 @@
+set hive.mapred.mode=nonstrict;
+DROP TABLE parquet_types_staging;
+DROP TABLE parquet_types;
+
+set hive.vectorized.execution.enabled=true;
+set hive.vectorized.execution.reduce.enabled=true;
+set hive.vectorized.use.row.serde.deserialize=true;
+set hive.vectorized.use.vector.serde.deserialize=true;
+set hive.vectorized.execution.reduce.groupby.enabled = true;
+
+CREATE TABLE parquet_types_staging (
+  cint int,
+  ctinyint tinyint,
+  csmallint smallint,
+  cfloat float,
+  cdouble double,
+  cstring1 string,
+  t timestamp,
+  cchar char(5),
+  cvarchar varchar(10),
+  cbinary string,
+  m1 map<string, varchar(3)>,
+  l1 array<int>,
+  st1 struct<c1:int, c2:char(1)>,
+  d date
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':';
+
+CREATE TABLE parquet_types (
+  cint int,
+  ctinyint tinyint,
+  csmallint smallint,
+  cfloat float,
+  cdouble double,
+  cstring1 string,
+  t timestamp,
+  cchar char(5),
+  cvarchar varchar(10),
+  cbinary binary,
+  m1 map<string, varchar(3)>,
+  l1 array<int>,
+  st1 struct<c1:int, c2:char(1)>,
+  d date
+) STORED AS PARQUET;
+
+LOAD DATA LOCAL INPATH '../../data/files/parquet_non_dictionary_types.txt' OVERWRITE INTO TABLE
+parquet_types_staging;
+
+SELECT * FROM parquet_types_staging;
+
+INSERT OVERWRITE TABLE parquet_types
+SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar,
+unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging;
+
+-- test types in group by
+
+EXPLAIN SELECT ctinyint,
+  MAX(cint),
+  MIN(csmallint),
+  COUNT(cstring1),
+  ROUND(AVG(cfloat), 5),
+  ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+SELECT ctinyint,
+  MAX(cint),
+  MIN(csmallint),
+  COUNT(cstring1),
+  ROUND(AVG(cfloat), 5),
+  ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+EXPLAIN SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+
+EXPLAIN SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+
+EXPLAIN SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+
+EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+
+EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
+SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/queries/clientpositive/parquet_types_vectorization.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_types_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_vectorization.q
new file mode 100644
index 0000000..bb0e5b2
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_types_vectorization.q
@@ -0,0 +1,96 @@
+set hive.mapred.mode=nonstrict;
+DROP TABLE parquet_types_staging;
+DROP TABLE parquet_types;
+
+set hive.vectorized.execution.enabled=true;
+set hive.vectorized.execution.reduce.enabled=true;
+set hive.vectorized.use.row.serde.deserialize=true;
+set hive.vectorized.use.vector.serde.deserialize=true;
+set hive.vectorized.execution.reduce.groupby.enabled = true;
+
+CREATE TABLE parquet_types_staging (
+  cint int,
+  ctinyint tinyint,
+  csmallint smallint,
+  cfloat float,
+  cdouble double,
+  cstring1 string,
+  t timestamp,
+  cchar char(5),
+  cvarchar varchar(10),
+  cbinary string,
+  m1 map<string, varchar(3)>,
+  l1 array<int>,
+  st1 struct<c1:int, c2:char(1)>,
+  d date
+) ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+COLLECTION ITEMS TERMINATED BY ','
+MAP KEYS TERMINATED BY ':';
+
+CREATE TABLE parquet_types (
+  cint int,
+  ctinyint tinyint,
+  csmallint smallint,
+  cfloat float,
+  cdouble double,
+  cstring1 string,
+  t timestamp,
+  cchar char(5),
+  cvarchar varchar(10),
+  cbinary binary,
+  m1 map<string, varchar(3)>,
+  l1 array<int>,
+  st1 struct<c1:int, c2:char(1)>,
+  d date
+) STORED AS PARQUET;
+
+LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging;
+
+SELECT * FROM parquet_types_staging;
+
+INSERT OVERWRITE TABLE parquet_types
+SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar,
+unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging;
+
+-- test types in group by
+
+EXPLAIN SELECT ctinyint,
+  MAX(cint),
+  MIN(csmallint),
+  COUNT(cstring1),
+  ROUND(AVG(cfloat), 5),
+  ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+SELECT ctinyint,
+  MAX(cint),
+  MIN(csmallint),
+  COUNT(cstring1),
+  ROUND(AVG(cfloat), 5),
+  ROUND(STDDEV_POP(cdouble),5)
+FROM parquet_types
+GROUP BY ctinyint
+ORDER BY ctinyint
+;
+
+EXPLAIN SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat;
+
+EXPLAIN SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar;
+
+EXPLAIN SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar;
+
+EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1;
+
+EXPLAIN SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t;
+SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t;
+
+EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
+SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out b/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
index 8345132..e42453d 100644
--- a/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
+++ b/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out
@@ -150,7 +150,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: tinyint)
                         Statistics: Num rows: 12288 Data size: 73728 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: int), _col2 (type: smallint), _col3 (type: bigint), _col4 (type: struct<count:bigint,sum:double,input:float>), _col5 (type: struct<count:bigint,sum:double,variance:double>)
-            Execution mode: llap
+            Execution mode: vectorized, llap
             LLAP IO: no inputs
         Reducer 2 
             Execution mode: llap

http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out b/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
index b49d5dd..0524cb3 100644
--- a/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
+++ b/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out
@@ -250,19 +250,19 @@ Stage-0
     limit:-1
     Stage-1
       Reducer 3 vectorized, llap
-      File Output Operator [FS_10]
-        Select Operator [SEL_9] (rows=11 width=11)
+      File Output Operator [FS_12]
+        Select Operator [SEL_11] (rows=11 width=11)
           Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"]
         <-Reducer 2 [SIMPLE_EDGE] llap
           SHUFFLE [RS_6]
             Group By Operator [GBY_4] (rows=11 width=11)
               Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(VALUE._col0)","min(VALUE._col1)","count(VALUE._col2)","avg(VALUE._col3)","stddev_pop(VALUE._col4)","max(VALUE._col5)"],keys:KEY._col0
-            <-Map 1 [SIMPLE_EDGE] llap
+            <-Map 1 [SIMPLE_EDGE] vectorized, llap
               SHUFFLE [RS_3]
                 PartitionCols:_col0
-                Group By Operator [GBY_2] (rows=22 width=11)
+                Group By Operator [GBY_10] (rows=22 width=11)
                   Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(cint)","min(csmallint)","count(cstring1)","avg(cfloat)","stddev_pop(cdouble)","max(cdecimal)"],keys:ctinyint
-                  Select Operator [SEL_1] (rows=22 width=11)
+                  Select Operator [SEL_9] (rows=22 width=11)
                     Output:["ctinyint","cint","csmallint","cstring1","cfloat","cdouble","cdecimal"]
                     TableScan [TS_0] (rows=22 width=11)
                       default@parquet_types,parquet_types,Tbl:COMPLETE,Col:NONE,Output:["cint","ctinyint","csmallint","cfloat","cdouble","cstring1","cdecimal"]