You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2016/12/21 08:59:50 UTC

[2/2] hive git commit: Revert "HIVE-15112: Implement Parquet vectorization reader for Struct type (Ferdinand Xu, via Chao Sun)"

Revert "HIVE-15112: Implement Parquet vectorization reader for Struct type (Ferdinand Xu, via Chao Sun)"

This reverts commit 9a524ada2eafda2f4853a12d469f7ca48e57f38c.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fe3b370e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fe3b370e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fe3b370e

Branch: refs/heads/master
Commit: fe3b370eb44a4864e5087cc80ee413931e89770d
Parents: ab9b219
Author: Ferdinand Xu <ch...@intel.com>
Authored: Wed Dec 21 09:22:49 2016 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Wed Dec 21 09:22:49 2016 +0800

----------------------------------------------------------------------
 .../parquet/vector/VectorizedColumnReader.java  | 566 ++++++++++++++-
 .../vector/VectorizedParquetRecordReader.java   |  84 +--
 .../vector/VectorizedPrimitiveColumnReader.java | 589 ----------------
 .../vector/VectorizedStructColumnReader.java    |  59 --
 .../io/parquet/TestVectorizedColumnReader.java  | 418 +++++++++--
 .../parquet/TestVectorizedColumnReaderBase.java | 694 -------------------
 ...ectorizedDictionaryEncodingColumnReader.java |  85 ---
 7 files changed, 932 insertions(+), 1563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
index e3be982..5a9c7f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
@@ -1,13 +1,9 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,25 +11,561 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hive.ql.io.parquet.vector;
 
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records for a column,
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ */
+public class VectorizedColumnReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class);
+
+  private boolean skipTimestampConversion = false;
+
+  /**
+   * Total number of values read.
+   */
+  private long valuesRead;
 
-public interface VectorizedColumnReader {
   /**
-   * read records with specified size and type into the columnVector
-   *
-   * @param total      number of records to read into the column vector
-   * @param column     column vector where the reader will read data into
-   * @param columnType the type of column vector
-   * @throws IOException
+   * value that indicates the end of the current page. That is,
+   * if valuesRead == endOfPageValueCount, we are at the end of the page.
    */
+  private long endOfPageValueCount;
+
+  /**
+   * The dictionary, if this column has dictionary encoding.
+   */
+  private final Dictionary dictionary;
+
+  /**
+   * If true, the current page is dictionary encoded.
+   */
+  private boolean isCurrentPageDictionaryEncoded;
+
+  /**
+   * Maximum definition level for this column.
+   */
+  private final int maxDefLevel;
+
+  private int definitionLevel;
+  private int repetitionLevel;
+
+  /**
+   * Repetition/Definition/Value readers.
+   */
+  private IntIterator repetitionLevelColumn;
+  private IntIterator definitionLevelColumn;
+  private ValuesReader dataColumn;
+
+  /**
+   * Total values in the current page.
+   */
+  private int pageValueCount;
+
+  private final PageReader pageReader;
+  private final ColumnDescriptor descriptor;
+  private final Type type;
+
+  public VectorizedColumnReader(
+    ColumnDescriptor descriptor,
+    PageReader pageReader,
+    boolean skipTimestampConversion,
+    Type type) throws IOException {
+    this.descriptor = descriptor;
+    this.type = type;
+    this.pageReader = pageReader;
+    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+    this.skipTimestampConversion = skipTimestampConversion;
+
+    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+    if (dictionaryPage != null) {
+      try {
+        this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+        this.isCurrentPageDictionaryEncoded = true;
+      } catch (IOException e) {
+        throw new IOException("could not decode the dictionary for " + descriptor, e);
+      }
+    } else {
+      this.dictionary = null;
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+  }
+
   void readBatch(
     int total,
     ColumnVector column,
-    TypeInfo columnType) throws IOException;
+    TypeInfo columnType) throws IOException {
+
+    int rowId = 0;
+    while (total > 0) {
+      // Compute the number of values we want to read in this page.
+      int leftInPage = (int) (endOfPageValueCount - valuesRead);
+      if (leftInPage == 0) {
+        readPage();
+        leftInPage = (int) (endOfPageValueCount - valuesRead);
+      }
+
+      int num = Math.min(total, leftInPage);
+      if (isCurrentPageDictionaryEncoded) {
+        LongColumnVector dictionaryIds = new LongColumnVector();
+        // Read and decode dictionary ids.
+        readDictionaryIDs(num, dictionaryIds, rowId);
+        decodeDictionaryIds(rowId, num, column, dictionaryIds);
+      } else {
+        // assign values in vector
+        PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+        switch (primitiveColumnType.getPrimitiveCategory()) {
+        case INT:
+        case BYTE:
+        case SHORT:
+          readIntegers(num, (LongColumnVector) column, rowId);
+          break;
+        case DATE:
+        case INTERVAL_YEAR_MONTH:
+        case LONG:
+          readLongs(num, (LongColumnVector) column, rowId);
+          break;
+        case BOOLEAN:
+          readBooleans(num, (LongColumnVector) column, rowId);
+          break;
+        case DOUBLE:
+          readDoubles(num, (DoubleColumnVector) column, rowId);
+          break;
+        case BINARY:
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          readBinaries(num, (BytesColumnVector) column, rowId);
+          break;
+        case FLOAT:
+          readFloats(num, (DoubleColumnVector) column, rowId);
+          break;
+        case DECIMAL:
+          readDecimal(num, (DecimalColumnVector) column, rowId);
+          break;
+        case INTERVAL_DAY_TIME:
+        case TIMESTAMP:
+        default:
+          throw new IOException(
+            "Unsupported type category: " + primitiveColumnType.getPrimitiveCategory());
+        }
+      }
+      rowId += num;
+      total -= num;
+    }
+  }
+
+  private void readDictionaryIDs(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readValueDictionaryId();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readIntegers(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readInteger();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readDoubles(
+    int total,
+    DoubleColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readDouble();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readBooleans(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readLongs(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readLong();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readFloats(
+    int total,
+    DoubleColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readFloat();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readDecimal(
+    int total,
+    DecimalColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+    c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readBinaries(
+    int total,
+    BytesColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
+        c.isNull[rowId] = false;
+        // TODO figure out a better way to set repeat for Binary type
+        c.isRepeating = false;
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  /**
+   * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
+   */
+  private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
+                                   LongColumnVector dictionaryIds) {
+    System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
+    if (column.noNulls) {
+      column.noNulls = dictionaryIds.noNulls;
+    }
+    column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
+
+    switch (descriptor.getType()) {
+    case INT32:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((LongColumnVector) column).vector[i] =
+          dictionary.decodeToInt((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case INT64:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((LongColumnVector) column).vector[i] =
+          dictionary.decodeToLong((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case FLOAT:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((DoubleColumnVector) column).vector[i] =
+          dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case DOUBLE:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((DoubleColumnVector) column).vector[i] =
+          dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case INT96:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
+        buf.order(ByteOrder.LITTLE_ENDIAN);
+        long timeOfDayNanos = buf.getLong();
+        int julianDay = buf.getInt();
+        NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
+        Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
+        ((TimestampColumnVector) column).set(i, ts);
+      }
+      break;
+    case BINARY:
+    case FIXED_LEN_BYTE_ARRAY:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((BytesColumnVector) column)
+          .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
+      }
+      break;
+    default:
+      throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+    }
+  }
+
+  private void readRepetitionAndDefinitionLevels() {
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
+    valuesRead++;
+  }
+
+  private void readPage() throws IOException {
+    DataPage page = pageReader.readPage();
+    // TODO: Why is this a visitor?
+    page.accept(new DataPage.Visitor<Void>() {
+      @Override
+      public Void visit(DataPageV1 dataPageV1) {
+        readPageV1(dataPageV1);
+        return null;
+      }
+
+      @Override
+      public Void visit(DataPageV2 dataPageV2) {
+        readPageV2(dataPageV2);
+        return null;
+      }
+    });
+  }
+
+  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = valuesRead + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
+      this.dataColumn = null;
+      if (dictionary == null) {
+        throw new IOException(
+          "could not read page in col " + descriptor +
+            " as the dictionary was missing for encoding " + dataEncoding);
+      }
+      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
+      this.isCurrentPageDictionaryEncoded = true;
+    } else {
+      if (dataEncoding != Encoding.PLAIN) {
+        throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+      }
+      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+
+    try {
+      dataColumn.initFromPage(pageValueCount, bytes, offset);
+    } catch (IOException e) {
+      throw new IOException("could not read page in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    try {
+      byte[] bytes = page.getBytes().toByteArray();
+      LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+      LOG.debug("reading repetition levels at 0");
+      rlReader.initFromPage(pageValueCount, bytes, 0);
+      int next = rlReader.getNextOffset();
+      LOG.debug("reading definition levels at " + next);
+      dlReader.initFromPage(pageValueCount, bytes, next);
+      next = dlReader.getNextOffset();
+      LOG.debug("reading data at " + next);
+      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV2(DataPageV2 page) {
+    this.pageValueCount = page.getValueCount();
+    this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
+      page.getRepetitionLevels());
+    this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
+    try {
+      LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+        new RunLengthBitPackingHybridDecoder(
+          BytesUtils.getWidthFromMaxInt(maxLevel),
+          new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
+    }
+  }
+
+  /**
+   * Utility classes to abstract over different way to read ints with different encodings.
+   * TODO: remove this layer of abstraction?
+   */
+  abstract static class IntIterator {
+    abstract int nextInt();
+  }
+
+  protected static final class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  protected static final class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  protected static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() { return 0; }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 699de59..f94c49a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -23,13 +23,11 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
@@ -37,7 +35,6 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputSplit;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
@@ -71,7 +68,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private List<String> columnNamesList;
   private List<TypeInfo> columnTypesList;
   private VectorizedRowBatchCtx rbCtx;
-  private List<Integer> indexColumnsWanted;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -202,7 +198,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
         columnTypesList);
     }
 
-    indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+    List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
     if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
       requestedSchema =
         DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
@@ -283,81 +279,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     List<Type> types = requestedSchema.getFields();
     columnReaders = new VectorizedColumnReader[columns.size()];
-
-    if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) {
-      for (int i = 0; i < types.size(); ++i) {
-        columnReaders[i] =
-          buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i),
-            pages, requestedSchema.getColumns(), skipTimestampConversion, 0);
-      }
-    } else {
-      for (int i = 0; i < types.size(); ++i) {
-        columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages,
-          requestedSchema.getColumns(), skipTimestampConversion, 0);
-      }
+    for (int i = 0; i < columns.size(); ++i) {
+      columnReaders[i] =
+        new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)),
+          skipTimestampConversion, types.get(i));
     }
-
     totalCountLoadedSoFar += pages.getRowCount();
   }
-
-  private List<ColumnDescriptor> getAllColumnDescriptorByType(
-    int depth,
-    Type type,
-    List<ColumnDescriptor> columns) throws ParquetRuntimeException {
-    List<ColumnDescriptor> res = new ArrayList<>();
-    for (ColumnDescriptor descriptor : columns) {
-      if (depth >= descriptor.getPath().length) {
-        throw new InvalidSchemaException("Corrupted Parquet schema");
-      }
-      if (type.getName().equals(descriptor.getPath()[depth])) {
-        res.add(descriptor);
-      }
-    }
-    return res;
-  }
-
-  // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema
-  private VectorizedColumnReader buildVectorizedParquetReader(
-    TypeInfo typeInfo,
-    Type type,
-    PageReadStore pages,
-    List<ColumnDescriptor> columnDescriptors,
-    boolean skipTimestampConversion,
-    int depth) throws IOException {
-    List<ColumnDescriptor> descriptors =
-      getAllColumnDescriptorByType(depth, type, columnDescriptors);
-    switch (typeInfo.getCategory()) {
-    case PRIMITIVE:
-      if (columnDescriptors == null || columnDescriptors.isEmpty()) {
-        throw new RuntimeException(
-          "Failed to find related Parquet column descriptor with type " + type);
-      } else {
-        return new VectorizedPrimitiveColumnReader(descriptors.get(0),
-          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
-      }
-    case STRUCT:
-      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
-      List<VectorizedColumnReader> fieldReaders = new ArrayList<>();
-      List<TypeInfo> fieldTypes = structTypeInfo.getAllStructFieldTypeInfos();
-      List<Type> types = type.asGroupType().getFields();
-      for (int i = 0; i < fieldTypes.size(); i++) {
-        VectorizedColumnReader r =
-          buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors,
-            skipTimestampConversion, depth + 1);
-        if (r != null) {
-          fieldReaders.add(r);
-        } else {
-          throw new RuntimeException(
-            "Fail to build Parquet vectorized reader based on Hive type " + fieldTypes.get(i)
-              .getTypeName() + " and Parquet type" + types.get(i).toString());
-        }
-      }
-      return new VectorizedStructColumnReader(fieldReaders);
-    case LIST:
-    case MAP:
-    case UNION:
-    default:
-      throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
deleted file mode 100644
index 3d5c6e6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.parquet.vector;
-
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.sql.Timestamp;
-
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-/**
- * It's column level Parquet reader which is used to read a batch of records for a column,
- * part of the code is referred from Apache Spark and Apache Parquet.
- */
-public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader {
-
-  private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class);
-
-  private boolean skipTimestampConversion = false;
-
-  /**
-   * Total number of values read.
-   */
-  private long valuesRead;
-
-  /**
-   * value that indicates the end of the current page. That is,
-   * if valuesRead == endOfPageValueCount, we are at the end of the page.
-   */
-  private long endOfPageValueCount;
-
-  /**
-   * The dictionary, if this column has dictionary encoding.
-   */
-  private final Dictionary dictionary;
-
-  /**
-   * If true, the current page is dictionary encoded.
-   */
-  private boolean isCurrentPageDictionaryEncoded;
-
-  /**
-   * Maximum definition level for this column.
-   */
-  private final int maxDefLevel;
-
-  private int definitionLevel;
-  private int repetitionLevel;
-
-  /**
-   * Repetition/Definition/Value readers.
-   */
-  private IntIterator repetitionLevelColumn;
-  private IntIterator definitionLevelColumn;
-  private ValuesReader dataColumn;
-
-  /**
-   * Total values in the current page.
-   */
-  private int pageValueCount;
-
-  private final PageReader pageReader;
-  private final ColumnDescriptor descriptor;
-  private final Type type;
-
-  public VectorizedPrimitiveColumnReader(
-    ColumnDescriptor descriptor,
-    PageReader pageReader,
-    boolean skipTimestampConversion,
-    Type type) throws IOException {
-    this.descriptor = descriptor;
-    this.type = type;
-    this.pageReader = pageReader;
-    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-    this.skipTimestampConversion = skipTimestampConversion;
-
-    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-    if (dictionaryPage != null) {
-      try {
-        this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
-        this.isCurrentPageDictionaryEncoded = true;
-      } catch (IOException e) {
-        throw new IOException("could not decode the dictionary for " + descriptor, e);
-      }
-    } else {
-      this.dictionary = null;
-      this.isCurrentPageDictionaryEncoded = false;
-    }
-  }
-
-  public void readBatch(
-    int total,
-    ColumnVector column,
-    TypeInfo columnType) throws IOException {
-    int rowId = 0;
-    while (total > 0) {
-      // Compute the number of values we want to read in this page.
-      int leftInPage = (int) (endOfPageValueCount - valuesRead);
-      if (leftInPage == 0) {
-        readPage();
-        leftInPage = (int) (endOfPageValueCount - valuesRead);
-      }
-
-      int num = Math.min(total, leftInPage);
-      if (isCurrentPageDictionaryEncoded) {
-        LongColumnVector dictionaryIds = new LongColumnVector();
-        // Read and decode dictionary ids.
-        readDictionaryIDs(num, dictionaryIds, rowId);
-        decodeDictionaryIds(rowId, num, column, dictionaryIds);
-      } else {
-        // assign values in vector
-        readBatchHelper(num, column, columnType, rowId);
-      }
-      rowId += num;
-      total -= num;
-    }
-  }
-
-  private void readBatchHelper(
-    int num,
-    ColumnVector column,
-    TypeInfo columnType,
-    int rowId) throws IOException {
-    PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
-    switch (primitiveColumnType.getPrimitiveCategory()) {
-    case INT:
-    case BYTE:
-    case SHORT:
-      readIntegers(num, (LongColumnVector) column, rowId);
-      break;
-    case DATE:
-    case INTERVAL_YEAR_MONTH:
-    case LONG:
-      readLongs(num, (LongColumnVector) column, rowId);
-      break;
-    case BOOLEAN:
-      readBooleans(num, (LongColumnVector) column, rowId);
-      break;
-    case DOUBLE:
-      readDoubles(num, (DoubleColumnVector) column, rowId);
-      break;
-    case BINARY:
-    case STRING:
-    case CHAR:
-    case VARCHAR:
-      readBinaries(num, (BytesColumnVector) column, rowId);
-      break;
-    case FLOAT:
-      readFloats(num, (DoubleColumnVector) column, rowId);
-      break;
-    case DECIMAL:
-      readDecimal(num, (DecimalColumnVector) column, rowId);
-      break;
-    case INTERVAL_DAY_TIME:
-    case TIMESTAMP:
-    default:
-      throw new IOException("Unsupported type: " + type);
-    }
-  }
-
-  private void readDictionaryIDs(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readValueDictionaryId();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readIntegers(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readInteger();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readDoubles(
-    int total,
-    DoubleColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readDouble();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readBooleans(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readLongs(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readLong();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readFloats(
-    int total,
-    DoubleColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readFloat();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readDecimal(
-    int total,
-    DecimalColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
-    c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readBinaries(
-    int total,
-    BytesColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
-        c.isNull[rowId] = false;
-        // TODO figure out a better way to set repeat for Binary type
-        c.isRepeating = false;
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  /**
-   * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
-   */
-  private void decodeDictionaryIds(
-    int rowId,
-    int num,
-    ColumnVector column,
-    LongColumnVector dictionaryIds) {
-    System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
-    if (column.noNulls) {
-      column.noNulls = dictionaryIds.noNulls;
-    }
-    column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
-
-    switch (descriptor.getType()) {
-    case INT32:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((LongColumnVector) column).vector[i] =
-          dictionary.decodeToInt((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case INT64:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((LongColumnVector) column).vector[i] =
-          dictionary.decodeToLong((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case FLOAT:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((DoubleColumnVector) column).vector[i] =
-          dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case DOUBLE:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((DoubleColumnVector) column).vector[i] =
-          dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case INT96:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
-        buf.order(ByteOrder.LITTLE_ENDIAN);
-        long timeOfDayNanos = buf.getLong();
-        int julianDay = buf.getInt();
-        NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
-        Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
-        ((TimestampColumnVector) column).set(i, ts);
-      }
-      break;
-    case BINARY:
-    case FIXED_LEN_BYTE_ARRAY:
-      if (column instanceof BytesColumnVector) {
-        for (int i = rowId; i < rowId + num; ++i) {
-          ((BytesColumnVector) column)
-            .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
-        }
-      } else {
-        DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
-        decimalColumnVector.precision =
-          (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
-        decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
-        for (int i = rowId; i < rowId + num; ++i) {
-          decimalColumnVector.vector[i]
-            .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(),
-              decimalColumnVector.scale);
-        }
-      }
-      break;
-    default:
-      throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
-    }
-  }
-
-  private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.nextInt();
-    definitionLevel = definitionLevelColumn.nextInt();
-    valuesRead++;
-  }
-
-  private void readPage() throws IOException {
-    DataPage page = pageReader.readPage();
-    // TODO: Why is this a visitor?
-    page.accept(new DataPage.Visitor<Void>() {
-      @Override
-      public Void visit(DataPageV1 dataPageV1) {
-        readPageV1(dataPageV1);
-        return null;
-      }
-
-      @Override
-      public Void visit(DataPageV2 dataPageV2) {
-        readPageV2(dataPageV2);
-        return null;
-      }
-    });
-  }
-
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
-    this.pageValueCount = valueCount;
-    this.endOfPageValueCount = valuesRead + pageValueCount;
-    if (dataEncoding.usesDictionary()) {
-      this.dataColumn = null;
-      if (dictionary == null) {
-        throw new IOException(
-          "could not read page in col " + descriptor +
-            " as the dictionary was missing for encoding " + dataEncoding);
-      }
-      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
-      this.isCurrentPageDictionaryEncoded = true;
-    } else {
-      if (dataEncoding != Encoding.PLAIN) {
-        throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
-      }
-      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
-      this.isCurrentPageDictionaryEncoded = false;
-    }
-
-    try {
-      dataColumn.initFromPage(pageValueCount, bytes, offset);
-    } catch (IOException e) {
-      throw new IOException("could not read page in col " + descriptor, e);
-    }
-  }
-
-  private void readPageV1(DataPageV1 page) {
-    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
-    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
-    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-    try {
-      byte[] bytes = page.getBytes().toByteArray();
-      LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
-      LOG.debug("reading repetition levels at 0");
-      rlReader.initFromPage(pageValueCount, bytes, 0);
-      int next = rlReader.getNextOffset();
-      LOG.debug("reading definition levels at " + next);
-      dlReader.initFromPage(pageValueCount, bytes, next);
-      next = dlReader.getNextOffset();
-      LOG.debug("reading data at " + next);
-      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
-    }
-  }
-
-  private void readPageV2(DataPageV2 page) {
-    this.pageValueCount = page.getValueCount();
-    this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
-      page.getRepetitionLevels());
-    this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
-    try {
-      LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
-      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
-    }
-  }
-
-  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-    try {
-      if (maxLevel == 0) {
-        return new NullIntIterator();
-      }
-      return new RLEIntIterator(
-        new RunLengthBitPackingHybridDecoder(
-          BytesUtils.getWidthFromMaxInt(maxLevel),
-          new ByteArrayInputStream(bytes.toByteArray())));
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
-    }
-  }
-
-  /**
-   * Utility classes to abstract over different way to read ints with different encodings.
-   * TODO: remove this layer of abstraction?
-   */
-  abstract static class IntIterator {
-    abstract int nextInt();
-  }
-
-  protected static final class ValuesReaderIntIterator extends IntIterator {
-    ValuesReader delegate;
-
-    public ValuesReaderIntIterator(ValuesReader delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      return delegate.readInteger();
-    }
-  }
-
-  protected static final class RLEIntIterator extends IntIterator {
-    RunLengthBitPackingHybridDecoder delegate;
-
-    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      try {
-        return delegate.readInt();
-      } catch (IOException e) {
-        throw new ParquetDecodingException(e);
-      }
-    }
-  }
-
-  protected static final class NullIntIterator extends IntIterator {
-    @Override
-    int nextInt() { return 0; }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
deleted file mode 100644
index cc6cb20..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.parquet.vector;
-
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import java.io.IOException;
-import java.util.List;
-
-public class VectorizedStructColumnReader implements VectorizedColumnReader {
-
-  private final List<VectorizedColumnReader> fieldReaders;
-
-  public VectorizedStructColumnReader(List<VectorizedColumnReader> fieldReaders) {
-    this.fieldReaders = fieldReaders;
-  }
-
-  @Override
-  public void readBatch(
-    int total,
-    ColumnVector column,
-    TypeInfo columnType) throws IOException {
-    StructColumnVector structColumnVector = (StructColumnVector) column;
-    StructTypeInfo structTypeInfo = (StructTypeInfo) columnType;
-    ColumnVector[] vectors = structColumnVector.fields;
-    for (int i = 0; i < vectors.length; i++) {
-      fieldReaders.get(i)
-        .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i));
-      structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating;
-
-      for (int j = 0; j < vectors[i].isNull.length; j++) {
-        structColumnVector.isNull[j] =
-          (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j];
-      }
-      structColumnVector.noNulls =
-        (i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/fe3b370e/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index d4b4140..276ff19 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -1,13 +1,9 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,74 +14,416 @@
 
 package org.apache.hadoop.hive.ql.io.parquet;
 
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Random;
 
-public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase {
-  static boolean isDictionaryEncoding = false;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
 
-  @BeforeClass
-  public static void setup() throws IOException {
-    removeFile();
-    writeData(initWriterFromFile(), isDictionaryEncoding);
-  }
+public class TestVectorizedColumnReader {
+
+  private static final int nElements = 2500;
+  protected static final Configuration conf = new Configuration();
+  protected static final Path file =
+    new Path("target/test/TestParquetVectorReader/testParquetFile");
+  private static String[] uniqueStrs = new String[nElements];
+  private static boolean[] isNulls = new boolean[nElements];
+  private static Random random = new Random();
+  protected static final MessageType schema = parseMessageType(
+    "message test { "
+      + "required int32 int32_field; "
+      + "required int64 int64_field; "
+      + "required int96 int96_field; "
+      + "required double double_field; "
+      + "required float float_field; "
+      + "required boolean boolean_field; "
+      + "required fixed_len_byte_array(3) flba_field; "
+      + "optional fixed_len_byte_array(1) some_null_field; "
+      + "optional fixed_len_byte_array(1) all_null_field; "
+      + "optional binary binary_field; "
+      + "optional binary binary_field_non_repeating; "
+      + "} ");
 
   @AfterClass
   public static void cleanup() throws IOException {
-    removeFile();
+    FileSystem fs = file.getFileSystem(conf);
+    if (fs.exists(file)) {
+      fs.delete(file, true);
+    }
   }
 
-  @Test
-  public void testIntRead() throws Exception {
-    intRead(isDictionaryEncoding);
+  @BeforeClass
+  public static void prepareFile() throws IOException {
+    cleanup();
+
+    boolean dictionaryEnabled = true;
+    boolean validating = false;
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(
+      file,
+      new GroupWriteSupport(),
+      GZIP, 1024*1024, 1024, 1024*1024,
+      dictionaryEnabled, validating, PARQUET_1_0, conf);
+    writeData(f, writer);
   }
 
-  @Test
-  public void testLongRead() throws Exception {
-    longRead(isDictionaryEncoding);
+  protected static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
+    initialStrings(uniqueStrs);
+    for (int i = 0; i < nElements; i++) {
+      Group group = f.newGroup()
+        .append("int32_field", i)
+        .append("int64_field", (long) 2 * i)
+        .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes()))
+        .append("double_field", i * 1.0)
+        .append("float_field", ((float) (i * 2.0)))
+        .append("boolean_field", i % 5 == 0)
+        .append("flba_field", "abc");
+
+      if (i % 2 == 1) {
+        group.append("some_null_field", "x");
+      }
+
+      if (i % 13 != 1) {
+        int binaryLen = i % 10;
+        group.append("binary_field",
+          Binary.fromString(new String(new char[binaryLen]).replace("\0", "x")));
+      }
+
+      if (uniqueStrs[i] != null) {
+        group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i]));
+      }
+      writer.write(group);
+    }
+    writer.close();
   }
 
-  @Test
-  public void testDoubleRead() throws Exception {
-    doubleRead(isDictionaryEncoding);
+  private static String getRandomStr() {
+    int len = random.nextInt(10);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < len; i++) {
+      sb.append((char) ('a' + random.nextInt(25)));
+    }
+    return sb.toString();
+  }
+
+  public static void initialStrings(String[] uniqueStrs) {
+    for (int i = 0; i < uniqueStrs.length; i++) {
+      String str = getRandomStr();
+      if (!str.isEmpty()) {
+        uniqueStrs[i] = str;
+        isNulls[i] = false;
+      }else{
+        isNulls[i] = true;
+      }
+    }
+  }
+
+  private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
+    throws IOException, InterruptedException, HiveException {
+    conf.set(PARQUET_READ_SCHEMA, schemaString);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
+
+    Job vectorJob = new Job(conf, "read vector");
+    ParquetInputFormat.setInputPaths(vectorJob, file);
+    ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
+    InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
+    initialVectorizedRowBatchCtx(conf);
+    return new VectorizedParquetRecordReader(split, new JobConf(conf));
+  }
+
+  private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
+    MapWork mapWork = new MapWork();
+    VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
+    rbCtx.init(createStructObjectInspector(conf), new String[0]);
+    mapWork.setVectorMode(true);
+    mapWork.setVectorizedRowBatchCtx(rbCtx);
+    Utilities.setMapWork(conf, mapWork);
+  }
+
+  private StructObjectInspector createStructObjectInspector(Configuration conf) {
+    // Create row related objects
+    String columnNames = conf.get(IOConstants.COLUMNS);
+    List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
+    String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
+    List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
+    TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
+    return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
   }
 
   @Test
-  public void testFloatRead() throws Exception {
-    floatRead(isDictionaryEncoding);
+  public void testIntRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"int32_field");
+    conf.set(IOConstants.COLUMNS_TYPES,"int");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required int32 int32_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        LongColumnVector vector = (LongColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals(c, vector.vector[i]);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 
   @Test
-  public void testBooleanRead() throws Exception {
-    booleanRead();
+  public void testLongRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"int64_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "bigint");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required int64 int64_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        LongColumnVector vector = (LongColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals(2 * c, vector.vector[i]);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 
   @Test
-  public void testBinaryRead() throws Exception {
-    binaryRead(isDictionaryEncoding);
+  public void testDoubleRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"double_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "double");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required double double_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals(1.0 * c, vector.vector[i], 0);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 
   @Test
-  public void testStructRead() throws Exception {
-    structRead(isDictionaryEncoding);
+  public void testFloatRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"float_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "float");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required float float_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          assertEquals((float)2.0 * c, vector.vector[i], 0);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 
   @Test
-  public void testNestedStructRead() throws Exception {
-    nestedStructRead0(isDictionaryEncoding);
-    nestedStructRead1(isDictionaryEncoding);
+  public void testBooleanRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"boolean_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "boolean");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required boolean boolean_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    try {
+      long c = 0;
+      while (reader.next(NullWritable.get(), previous)) {
+        LongColumnVector vector = (LongColumnVector) previous.cols[0];
+        assertTrue(vector.noNulls);
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          int e = (c % 5 == 0) ? 1 : 0;
+          assertEquals(e, vector.vector[i]);
+          assertFalse(vector.isNull[i]);
+          c++;
+        }
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 
   @Test
-  public void structReadSomeNull() throws Exception {
-    structReadSomeNull(isDictionaryEncoding);
+  public void testBinaryReadDictionaryEncoding() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"binary_field");
+    conf.set(IOConstants.COLUMNS_TYPES, "string");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required binary binary_field;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int c = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+        boolean noNull = true;
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          if (c % 13 == 1) {
+            assertTrue(vector.isNull[i]);
+          } else {
+            assertFalse(vector.isNull[i]);
+            int binaryLen = c % 10;
+            String expected = new String(new char[binaryLen]).replace("\0", "x");
+            String actual = new String(ArrayUtils
+              .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+            assertEquals("Failed at " + c, expected, actual);
+            noNull = false;
+          }
+          c++;
+        }
+        assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+        assertFalse(vector.isRepeating);
+      }
+      assertEquals(nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 
   @Test
-  public void decimalRead() throws Exception {
-    decimalRead(isDictionaryEncoding);
+  public void testBinaryRead() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(IOConstants.COLUMNS,"binary_field_non_repeating");
+    conf.set(IOConstants.COLUMNS_TYPES, "string");
+    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
+    VectorizedParquetRecordReader reader =
+      createParquetReader("message test { required binary binary_field_non_repeating;}", conf);
+    VectorizedRowBatch previous = reader.createValue();
+    int c = 0;
+    try {
+      while (reader.next(NullWritable.get(), previous)) {
+        BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
+        boolean noNull = true;
+        for (int i = 0; i < vector.vector.length; i++) {
+          if(c == nElements){
+            break;
+          }
+          String actual;
+          assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]);
+          if (!vector.isNull[i]) {
+            actual = new String(ArrayUtils
+              .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
+            assertEquals("failed at " + c, uniqueStrs[c], actual);
+          }else{
+            noNull = false;
+          }
+          c++;
+        }
+        assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
+        assertFalse(vector.isRepeating);
+      }
+      assertEquals("It doesn't exit at expected position", nElements, c);
+    } finally {
+      reader.close();
+    }
   }
 }