You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/02/15 02:42:47 UTC

[3/3] hive git commit: HIVE-18553 : Support schema evolution in Parquet Vectorization reader (Ferdinand Xu, reviewed by Vihang Karajgaonkar)

HIVE-18553 : Support schema evolution in Parquet Vectorization reader (Ferdinand Xu, reviewed by Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7ddac02b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7ddac02b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7ddac02b

Branch: refs/heads/master
Commit: 7ddac02b8e0630c38e3c7b739c7d43e130988e05
Parents: 71d6e16
Author: Vihang Karajgaonkar <vi...@cloudera.com>
Authored: Wed Feb 14 18:16:47 2018 -0800
Committer: Vihang Karajgaonkar <vi...@cloudera.com>
Committed: Wed Feb 14 18:16:47 2018 -0800

----------------------------------------------------------------------
 .../vector/BaseVectorizedColumnReader.java      |  72 +-
 .../parquet/vector/ParquetDataColumnReader.java | 170 ++++
 .../vector/ParquetDataColumnReaderFactory.java  | 908 +++++++++++++++++++
 .../vector/VectorizedDummyColumnReader.java     |  42 +
 .../vector/VectorizedListColumnReader.java      | 297 +++---
 .../vector/VectorizedParquetRecordReader.java   |  45 +-
 .../vector/VectorizedPrimitiveColumnReader.java | 271 ++++--
 .../hive/ql/io/parquet/vector/package-info.java |  22 +
 .../io/parquet/TestVectorizedColumnReader.java  |  16 +-
 ...ectorizedDictionaryEncodingColumnReader.java |  14 +
 .../parquet/VectorizedColumnReaderTestBase.java | 696 ++++++++++----
 ...ema_evol_par_vec_table_dictionary_encoding.q |  94 ++
 ...evol_par_vec_table_non_dictionary_encoding.q |  94 ++
 .../schema_evol_par_vec_table.q.out             | 357 ++++++++
 ...evol_par_vec_table_dictionary_encoding.q.out | 522 +++++++++++
 ..._par_vec_table_non_dictionary_encoding.q.out | 522 +++++++++++
 16 files changed, 3713 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
index 907a9b8..4a17ee4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java
@@ -1,9 +1,13 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,10 +18,10 @@
 
 package org.apache.hadoop.hive.ql.io.parquet.vector;
 
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
@@ -27,6 +31,7 @@ import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
 import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +67,7 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
   /**
    * The dictionary, if this column has dictionary encoding.
    */
-  protected final Dictionary dictionary;
+  protected final ParquetDataColumnReader dictionary;
 
   /**
    * If true, the current page is dictionary encoded.
@@ -82,7 +87,7 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
    */
   protected IntIterator repetitionLevelColumn;
   protected IntIterator definitionLevelColumn;
-  protected ValuesReader dataColumn;
+  protected ParquetDataColumnReader dataColumn;
 
   /**
    * Total values in the current page.
@@ -92,22 +97,39 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
   protected final PageReader pageReader;
   protected final ColumnDescriptor descriptor;
   protected final Type type;
+  protected final TypeInfo hiveType;
+
+  /**
+   * Used for VectorizedDummyColumnReader.
+   */
+  public BaseVectorizedColumnReader(){
+    this.pageReader = null;
+    this.descriptor = null;
+    this.type = null;
+    this.dictionary = null;
+    this.hiveType = null;
+    this.maxDefLevel = -1;
+  }
 
   public BaseVectorizedColumnReader(
       ColumnDescriptor descriptor,
       PageReader pageReader,
       boolean skipTimestampConversion,
-      Type type) throws IOException {
+      Type parquetType, TypeInfo hiveType) throws IOException {
     this.descriptor = descriptor;
-    this.type = type;
+    this.type = parquetType;
     this.pageReader = pageReader;
     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
     this.skipTimestampConversion = skipTimestampConversion;
+    this.hiveType = hiveType;
 
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
     if (dictionaryPage != null) {
       try {
-        this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+        this.dictionary = ParquetDataColumnReaderFactory
+            .getDataColumnReaderByTypeOnDictionary(parquetType.asPrimitiveType(), hiveType,
+                dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage),
+                skipTimestampConversion);
         this.isCurrentPageDictionaryEncoded = true;
       } catch (IOException e) {
         throw new IOException("could not decode the dictionary for " + descriptor, e);
@@ -130,7 +152,7 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
     if (page == null) {
       return;
     }
-    // TODO: Why is this a visitor?
+
     page.accept(new DataPage.Visitor<Void>() {
       @Override
       public Void visit(DataPageV1 dataPageV1) {
@@ -146,7 +168,8 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
     });
   }
 
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
+  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount)
+      throws IOException {
     this.pageValueCount = valueCount;
     this.endOfPageValueCount = valuesRead + pageValueCount;
     if (dataEncoding.usesDictionary()) {
@@ -156,10 +179,13 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
             "could not read page in col " + descriptor +
                 " as the dictionary was missing for encoding " + dataEncoding);
       }
-      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
+      dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType,
+          dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary
+              .getDictionary()), skipTimestampConversion);
       this.isCurrentPageDictionaryEncoded = true;
     } else {
-      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+      dataColumn = ParquetDataColumnReaderFactory.getDataColumnReaderByType(type.asPrimitiveType(), hiveType,
+          dataEncoding.getValuesReader(descriptor, VALUES), skipTimestampConversion);
       this.isCurrentPageDictionaryEncoded = false;
     }
 
@@ -219,8 +245,20 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
   }
 
   /**
+   * Check the underlying Parquet file is able to parse as Hive Decimal type.
+   *
+   * @param type
+   */
+  protected void decimalTypeCheck(Type type) {
+    DecimalMetadata decimalMetadata = type.asPrimitiveType().getDecimalMetadata();
+    if (decimalMetadata == null) {
+      throw new UnsupportedOperationException("The underlying Parquet type cannot be able to " +
+          "converted to Hive Decimal type: " + type);
+    }
+  }
+
+  /**
    * Utility classes to abstract over different way to read ints with different encodings.
-   * TODO: remove this layer of abstraction?
    */
   abstract static class IntIterator {
     abstract int nextInt();
@@ -258,6 +296,8 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
 
   protected static final class NullIntIterator extends IntIterator {
     @Override
-    int nextInt() { return 0; }
+    int nextInt() {
+      return 0;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java
new file mode 100644
index 0000000..6bfa95a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.parquet.column.Dictionary;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * The interface to wrap the underlying Parquet dictionary and non dictionary encoded page reader.
+ */
+public interface ParquetDataColumnReader {
+
+  /**
+   * Initialize the reader by page data.
+   * @param valueCount value count
+   * @param page page data
+   * @param offset current offset
+   * @throws IOException
+   */
+  void initFromPage(int valueCount, byte[] page, int offset) throws IOException;
+
+  /**
+   * @return the next Dictionary ID from the page
+   */
+  int readValueDictionaryId();
+
+  /**
+   * @return the next Long from the page
+   */
+  long readLong();
+
+  /**
+   * @return the next Integer from the page
+   */
+  int readInteger();
+
+  /**
+   * @return the next Float from the page
+   */
+  float readFloat();
+
+  /**
+   * @return the next Boolean from the page
+   */
+  boolean readBoolean();
+
+  /**
+   * @return the next String from the page
+   */
+  byte[] readString();
+
+  /**
+   * @return the next Varchar from the page
+   */
+  byte[] readVarchar();
+
+  /**
+   * @return the next Char from the page
+   */
+  byte[] readChar();
+
+  /**
+   * @return the next Bytes from the page
+   */
+  byte[] readBytes();
+
+  /**
+   * @return the next Decimal from the page
+   */
+  byte[] readDecimal();
+
+  /**
+   * @return the next Double from the page
+   */
+  double readDouble();
+
+  /**
+   * @return the next Timestamp from the page
+   */
+  Timestamp readTimestamp();
+
+  /**
+   * @return the underlying dictionary if current reader is dictionary encoded
+   */
+  Dictionary getDictionary();
+
+  /**
+   * @param id in dictionary
+   * @return the Bytes from the dictionary by id
+   */
+  byte[] readBytes(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Float from the dictionary by id
+   */
+  float readFloat(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Double from the dictionary by id
+   */
+  double readDouble(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Integer from the dictionary by id
+   */
+  int readInteger(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Long from the dictionary by id
+   */
+  long readLong(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Boolean from the dictionary by id
+   */
+  boolean readBoolean(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Decimal from the dictionary by id
+   */
+  byte[] readDecimal(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Timestamp from the dictionary by id
+   */
+  Timestamp readTimestamp(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the String from the dictionary by id
+   */
+  byte[] readString(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Varchar from the dictionary by id
+   */
+  byte[] readVarchar(int id);
+
+  /**
+   * @param id in dictionary
+   * @return the Char from the dictionary by id
+   */
+  byte[] readChar(int id);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
new file mode 100644
index 0000000..898a2c6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.common.type.HiveBaseChar;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+import java.util.Arrays;
+
+/**
+ * Parquet file has self-describing schema which may differ from the user required schema (e.g.
+ * schema evolution). This factory is used to retrieve user required typed data via corresponding
+ * reader which reads the underlying data.
+ */
+public final class ParquetDataColumnReaderFactory {
+
+  private ParquetDataColumnReaderFactory() {
+  }
+
+  /**
+   * The default data column reader for existing Parquet page reader which works for both
+   * dictionary or non dictionary types, Mirror from dictionary encoding path.
+   */
+  public static class DefaultParquetDataColumnReader implements ParquetDataColumnReader {
+    protected ValuesReader valuesReader;
+    protected Dictionary dict;
+
+    // Varchar or char length
+    protected int length = -1;
+
+    public DefaultParquetDataColumnReader(ValuesReader valuesReader, int length) {
+      this.valuesReader = valuesReader;
+      this.length = length;
+    }
+
+    public DefaultParquetDataColumnReader(Dictionary dict, int length) {
+      this.dict = dict;
+      this.length = length;
+    }
+
+    public void initFromPage(int i, ByteBuffer byteBuffer, int i1) throws IOException {
+      valuesReader.initFromPage(i, byteBuffer, i1);
+    }
+
+    @Override
+    public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
+      this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+    }
+
+    @Override
+    public boolean readBoolean() {
+      return valuesReader.readBoolean();
+    }
+
+    @Override
+    public boolean readBoolean(int id) {
+      return dict.decodeToBoolean(id);
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readString() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      // we need to enforce the size here even the types are the same
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readChar() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readBytes() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readBytes(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readDecimal() {
+      return valuesReader.readBytes().getBytesUnsafe();
+    }
+
+    @Override
+    public byte[] readDecimal(int id) {
+      return dict.decodeToBinary(id).getBytesUnsafe();
+    }
+
+    @Override
+    public float readFloat() {
+      return valuesReader.readFloat();
+    }
+
+    @Override
+    public float readFloat(int id) {
+      return dict.decodeToFloat(id);
+    }
+
+    @Override
+    public double readDouble() {
+      return valuesReader.readDouble();
+    }
+
+    @Override
+    public double readDouble(int id) {
+      return dict.decodeToDouble(id);
+    }
+
+    @Override
+    public Timestamp readTimestamp() {
+      throw new RuntimeException("Unsupported operation");
+    }
+
+    @Override
+    public Timestamp readTimestamp(int id) {
+      throw new RuntimeException("Unsupported operation");
+    }
+
+    @Override
+    public int readInteger() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public int readInteger(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public long readLong(int id) {
+      return dict.decodeToLong(id);
+    }
+
+    @Override
+    public long readLong() {
+      return valuesReader.readLong();
+    }
+
+    @Override
+    public int readValueDictionaryId() {
+      return valuesReader.readValueDictionaryId();
+    }
+
+    public void skip() {
+      valuesReader.skip();
+    }
+
+    @Override
+    public Dictionary getDictionary() {
+      return dict;
+    }
+
+    /**
+     * Enforce the max legnth of varchar or char.
+     */
+    protected String enforceMaxLength(String value) {
+      return HiveBaseChar.enforceMaxLength(value, length);
+    }
+
+    /**
+     * Enforce the char length.
+     */
+    protected String getPaddedString(String value) {
+      return HiveBaseChar.getPaddedValue(value, length);
+    }
+
+    /**
+     * Method to convert string to UTF-8 bytes.
+     */
+    protected static byte[] convertToBytes(String value) {
+      try {
+        // convert integer to string
+        return value.getBytes("UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new RuntimeException("Failed to encode string in UTF-8", e);
+      }
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying int32 value value. Implementation is in consist with
+   * ETypeConverter EINT32_CONVERTER
+   */
+  public static class TypesFromInt32PageReader extends DefaultParquetDataColumnReader {
+
+    public TypesFromInt32PageReader(ValuesReader realReader, int length) {
+      super(realReader, length);
+    }
+
+    public TypesFromInt32PageReader(Dictionary dict, int length) {
+      super(dict, length);
+    }
+
+    @Override
+    public long readLong() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public long readLong(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public float readFloat() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public float readFloat(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public double readDouble() {
+      return valuesReader.readInteger();
+    }
+
+    @Override
+    public double readDouble(int id) {
+      return dict.decodeToInt(id);
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(valuesReader.readInteger());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(dict.decodeToInt(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readInteger()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToInt(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readInteger()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToInt(id)));
+      return convertToBytes(value);
+    }
+
+    private static String convertToString(int value) {
+      return Integer.toString(value);
+    }
+
+    private static byte[] convertToBytes(int value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying int64 value value. Implementation is in consist with
+   * ETypeConverter EINT64_CONVERTER
+   */
+  public static class TypesFromInt64PageReader extends DefaultParquetDataColumnReader {
+
+    public TypesFromInt64PageReader(ValuesReader realReader, int length) {
+      super(realReader, length);
+    }
+
+    public TypesFromInt64PageReader(Dictionary dict, int length) {
+      super(dict, length);
+    }
+
+    @Override
+    public float readFloat() {
+      return valuesReader.readLong();
+    }
+
+    @Override
+    public float readFloat(int id) {
+      return dict.decodeToLong(id);
+    }
+
+    @Override
+    public double readDouble() {
+      return valuesReader.readLong();
+    }
+
+    @Override
+    public double readDouble(int id) {
+      return dict.decodeToLong(id);
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(valuesReader.readLong());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(dict.decodeToLong(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readLong()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToLong(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readLong()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToLong(id)));
+      return convertToBytes(value);
+    }
+
+    private static String convertToString(long value) {
+      return Long.toString(value);
+    }
+
+    private static byte[] convertToBytes(long value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying float value value. Implementation is in consist with
+   * ETypeConverter EFLOAT_CONVERTER
+   */
+  public static class TypesFromFloatPageReader extends DefaultParquetDataColumnReader {
+
+    public TypesFromFloatPageReader(ValuesReader realReader, int length) {
+      super(realReader, length);
+    }
+
+    public TypesFromFloatPageReader(Dictionary realReader, int length) {
+      super(realReader, length);
+    }
+
+    @Override
+    public double readDouble() {
+      return valuesReader.readFloat();
+    }
+
+    @Override
+    public double readDouble(int id) {
+      return dict.decodeToFloat(id);
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(valuesReader.readFloat());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(dict.decodeToFloat(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readFloat()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToFloat(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readFloat()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToFloat(id)));
+      return convertToBytes(value);
+    }
+
+    private static String convertToString(float value) {
+      return Float.toString(value);
+    }
+
+    private static byte[] convertToBytes(float value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying double value value.
+   */
+  public static class TypesFromDoublePageReader extends DefaultParquetDataColumnReader {
+
+    public TypesFromDoublePageReader(ValuesReader realReader, int length) {
+      super(realReader, length);
+    }
+
+    public TypesFromDoublePageReader(Dictionary dict, int length) {
+      super(dict, length);
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(valuesReader.readDouble());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(dict.decodeToDouble(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readDouble()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToDouble(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readDouble()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToDouble(id)));
+      return convertToBytes(value);
+    }
+
+    private static String convertToString(double value) {
+      return Double.toString(value);
+    }
+
+    private static byte[] convertToBytes(double value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying boolean value value.
+   */
+  public static class TypesFromBooleanPageReader extends DefaultParquetDataColumnReader {
+
+    public TypesFromBooleanPageReader(ValuesReader valuesReader, int length) {
+      super(valuesReader, length);
+    }
+
+    public TypesFromBooleanPageReader(Dictionary dict, int length) {
+      super(dict, length);
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(valuesReader.readBoolean());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(dict.decodeToBoolean(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readBoolean()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToBoolean(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readBoolean()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToBoolean(id)));
+      return convertToBytes(value);
+    }
+
+    private static String convertToString(boolean value) {
+      return Boolean.toString(value);
+    }
+
+    private static byte[] convertToBytes(boolean value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying Timestamp value value.
+   */
+  public static class TypesFromInt96PageReader extends DefaultParquetDataColumnReader {
+    private boolean skipTimestampConversion = false;
+
+    public TypesFromInt96PageReader(ValuesReader realReader, int length,
+                                    boolean skipTimestampConversion) {
+      super(realReader, length);
+      this.skipTimestampConversion = skipTimestampConversion;
+    }
+
+    public TypesFromInt96PageReader(Dictionary dict, int length, boolean skipTimestampConversion) {
+      super(dict, length);
+      this.skipTimestampConversion = skipTimestampConversion;
+    }
+
+    private Timestamp convert(Binary binary) {
+      ByteBuffer buf = binary.toByteBuffer();
+      buf.order(ByteOrder.LITTLE_ENDIAN);
+      long timeOfDayNanos = buf.getLong();
+      int julianDay = buf.getInt();
+      NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
+      return NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
+    }
+
+    @Override
+    public Timestamp readTimestamp(int id) {
+      return convert(dict.decodeToBinary(id));
+    }
+
+    @Override
+    public Timestamp readTimestamp() {
+      return convert(valuesReader.readBytes());
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(readTimestamp());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(readTimestamp(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(readTimestamp()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(readTimestamp(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(readTimestamp()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(readTimestamp(id)));
+      return convertToBytes(value);
+    }
+
+    private static String convertToString(Timestamp value) {
+      return value.toString();
+    }
+
+    private static byte[] convertToBytes(Timestamp value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying decimal value value.
+   */
+  public static class TypesFromDecimalPageReader extends DefaultParquetDataColumnReader {
+    private HiveDecimalWritable tempDecimal = new HiveDecimalWritable();
+    private short scale;
+
+    public TypesFromDecimalPageReader(ValuesReader realReader, int length, short scale) {
+      super(realReader, length);
+      this.scale = scale;
+    }
+
+    public TypesFromDecimalPageReader(Dictionary dict, int length, short scale) {
+      super(dict, length);
+      this.scale = scale;
+    }
+
+    @Override
+    public byte[] readString() {
+      return convertToBytes(valuesReader.readBytes());
+    }
+
+    @Override
+    public byte[] readString(int id) {
+      return convertToBytes(dict.decodeToBinary(id));
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readBytes()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToBinary(id)));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      String value = enforceMaxLength(
+          convertToString(valuesReader.readBytes()));
+      return convertToBytes(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      String value = enforceMaxLength(
+          convertToString(dict.decodeToBinary(id)));
+      return convertToBytes(value);
+    }
+
+    private String convertToString(Binary value) {
+      tempDecimal.set(value.getBytesUnsafe(), scale);
+      return tempDecimal.toString();
+    }
+
+    private byte[] convertToBytes(Binary value) {
+      return convertToBytes(convertToString(value));
+    }
+  }
+
+  /**
+   * The reader who reads from the underlying UTF8 string.
+   */
+  public static class TypesFromStringPageReader extends DefaultParquetDataColumnReader {
+
+    public TypesFromStringPageReader(ValuesReader realReader, int length) {
+      super(realReader, length);
+    }
+
+    public TypesFromStringPageReader(Dictionary dict, int length) {
+      super(dict, length);
+    }
+
+    @Override
+    public byte[] readVarchar() {
+      // check the character numbers with the length
+      final byte[] value = valuesReader.readBytes().getBytesUnsafe();
+      return truncateIfNecesssary(value);
+    }
+
+    @Override
+    public byte[] readVarchar(int id) {
+      // check the character numbers with the length
+      final byte[] value = dict.decodeToBinary(id).getBytesUnsafe();
+      return truncateIfNecesssary(value);
+    }
+
+    @Override
+    public byte[] readChar() {
+      // check the character numbers with the length
+      final byte[] value = valuesReader.readBytes().getBytesUnsafe();
+      return truncateIfNecesssary(value);
+    }
+
+    @Override
+    public byte[] readChar(int id) {
+      // check the character numbers with the length
+      final byte[] value = dict.decodeToBinary(id).getBytesUnsafe();
+      return truncateIfNecesssary(value);
+    }
+
+    private byte[] truncateIfNecesssary(byte[] bytes) {
+      if (length <= 0 || bytes == null) {
+        return bytes;
+      }
+
+      int len = bytes.length;
+      int truncatedLength = StringExpr.truncate(bytes, 0, len, length);
+      if (truncatedLength >= len) {
+        return bytes;
+      }
+
+      return Arrays.copyOf(bytes, truncatedLength);
+    }
+  }
+
+  private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(boolean isDictionary,
+                                                                         PrimitiveType parquetType,
+                                                                         TypeInfo hiveType,
+                                                                         Dictionary dictionary,
+                                                                         ValuesReader valuesReader,
+                                                                         boolean
+                                                                             skipTimestampConversion)
+      throws IOException {
+    // max length for varchar and char cases
+    int length = getVarcharLength(hiveType);
+
+    switch (parquetType.getPrimitiveTypeName()) {
+    case INT32:
+      return isDictionary ? new TypesFromInt32PageReader(dictionary, length) : new
+          TypesFromInt32PageReader(valuesReader, length);
+    case INT64:
+      return isDictionary ? new TypesFromInt64PageReader(dictionary, length) : new
+          TypesFromInt64PageReader(valuesReader, length);
+    case FLOAT:
+      return isDictionary ? new TypesFromFloatPageReader(dictionary, length) : new
+          TypesFromFloatPageReader(valuesReader, length);
+    case INT96:
+      return isDictionary ? new TypesFromInt96PageReader(dictionary, length,
+          skipTimestampConversion) : new
+          TypesFromInt96PageReader(valuesReader, length, skipTimestampConversion);
+    case BOOLEAN:
+      return isDictionary ? new TypesFromBooleanPageReader(dictionary, length) : new
+          TypesFromBooleanPageReader(valuesReader, length);
+    case BINARY:
+    case FIXED_LEN_BYTE_ARRAY:
+      return getConvertorFromBinary(isDictionary, parquetType, hiveType, valuesReader, dictionary);
+    case DOUBLE:
+      return isDictionary ? new TypesFromDoublePageReader(dictionary, length) : new
+          TypesFromDoublePageReader(valuesReader, length);
+    default:
+      return isDictionary ? new DefaultParquetDataColumnReader(dictionary, length) : new
+          DefaultParquetDataColumnReader(valuesReader, length);
+    }
+  }
+
+  private static ParquetDataColumnReader getConvertorFromBinary(boolean isDict,
+                                                                PrimitiveType parquetType,
+                                                                TypeInfo hiveType,
+                                                                ValuesReader valuesReader,
+                                                                Dictionary dictionary) {
+    OriginalType originalType = parquetType.getOriginalType();
+
+    // max length for varchar and char cases
+    int length = getVarcharLength(hiveType);
+
+    if (originalType == null) {
+      return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new
+          DefaultParquetDataColumnReader(valuesReader, length);
+    }
+    switch (originalType) {
+    case DECIMAL:
+      final short scale = (short) parquetType.asPrimitiveType().getDecimalMetadata().getScale();
+      return isDict ? new TypesFromDecimalPageReader(dictionary, length, scale) : new
+          TypesFromDecimalPageReader(valuesReader, length, scale);
+    case UTF8:
+      return isDict ? new TypesFromStringPageReader(dictionary, length) : new
+          TypesFromStringPageReader(valuesReader, length);
+    default:
+      return isDict ? new DefaultParquetDataColumnReader(dictionary, length) : new
+          DefaultParquetDataColumnReader(valuesReader, length);
+    }
+  }
+
+  public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
+      PrimitiveType parquetType,
+      TypeInfo hiveType,
+      Dictionary realReader, boolean skipTimestampConversion)
+      throws IOException {
+    return getDataColumnReaderByTypeHelper(true, parquetType, hiveType, realReader, null,
+        skipTimestampConversion);
+  }
+
+  public static ParquetDataColumnReader getDataColumnReaderByType(PrimitiveType parquetType,
+                                                                  TypeInfo hiveType,
+                                                                  ValuesReader realReader,
+                                                                  boolean skipTimestampConversion)
+      throws IOException {
+    return getDataColumnReaderByTypeHelper(false, parquetType, hiveType, null, realReader,
+        skipTimestampConversion);
+  }
+
+
+  // For Varchar or char type, return the max length of the type
+  private static int getVarcharLength(TypeInfo hiveType) {
+    int length = -1;
+    if (hiveType instanceof PrimitiveTypeInfo) {
+      PrimitiveTypeInfo hivePrimitiveType = (PrimitiveTypeInfo) hiveType;
+      switch (hivePrimitiveType.getPrimitiveCategory()) {
+      case CHAR:
+        length = ((CharTypeInfo) hivePrimitiveType).getLength();
+        break;
+      case VARCHAR:
+        length = ((VarcharTypeInfo) hivePrimitiveType).getLength();
+        break;
+      default:
+        break;
+      }
+    }
+
+    return length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
new file mode 100644
index 0000000..ee1d692
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedDummyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * A dummy vectorized parquet reader to support schema evolution.
+ */
+public class VectorizedDummyColumnReader extends BaseVectorizedColumnReader {
+
+  public VectorizedDummyColumnReader() {
+    super();
+  }
+
+  @Override
+  public void readBatch(int total, ColumnVector column, TypeInfo columnType) throws IOException {
+    Arrays.fill(column.isNull, true);
+    column.isRepeating = true;
+    column.noNulls = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
index c36640d..cd2c0ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedListColumnReader.java
@@ -33,6 +33,7 @@ import java.util.List;
 
 /**
  * It's column level Parquet reader which is used to read a batch of records for a list column.
+ * TODO Currently List type only support non nested case.
  */
 public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
 
@@ -46,8 +47,9 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
   boolean isFirstRow = true;
 
   public VectorizedListColumnReader(ColumnDescriptor descriptor, PageReader pageReader,
-    boolean skipTimestampConversion, Type type) throws IOException {
-    super(descriptor, pageReader, skipTimestampConversion, type);
+                                    boolean skipTimestampConversion, Type type, TypeInfo hiveType)
+      throws IOException {
+    super(descriptor, pageReader, skipTimestampConversion, type, hiveType);
   }
 
   @Override
@@ -81,7 +83,7 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
 
     // Decode the value if necessary
     if (isCurrentPageDictionaryEncoded) {
-      valueList = decodeDictionaryIds(valueList);
+      valueList = decodeDictionaryIds(category, valueList);
     }
     // Convert valueList to array for the ListColumnVector.child
     convertValueListToListColumnVector(category, lcv, valueList, index);
@@ -142,75 +144,112 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
     lcv.lengths[index] = elements.size() - lcv.offsets[index];
   }
 
+  // Need to be in consistent with that VectorizedPrimitiveColumnReader#readBatchHelper
+  // TODO Reduce the duplicated code
   private Object readPrimitiveTypedRow(PrimitiveObjectInspector.PrimitiveCategory category) {
     switch (category) {
-      case INT:
-      case BYTE:
-      case SHORT:
-        return dataColumn.readInteger();
-      case DATE:
-      case INTERVAL_YEAR_MONTH:
-      case LONG:
-        return dataColumn.readLong();
-      case BOOLEAN:
-        return dataColumn.readBoolean() ? 1 : 0;
-      case DOUBLE:
-        return dataColumn.readDouble();
-      case BINARY:
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return dataColumn.readBytes().getBytesUnsafe();
-      case FLOAT:
-        return dataColumn.readFloat();
-      case DECIMAL:
-        return dataColumn.readBytes().getBytesUnsafe();
-      case INTERVAL_DAY_TIME:
-      case TIMESTAMP:
-      default:
-        throw new RuntimeException("Unsupported type in the list: " + type);
+    case INT:
+    case BYTE:
+    case SHORT:
+      return dataColumn.readInteger();
+    case DATE:
+    case INTERVAL_YEAR_MONTH:
+    case LONG:
+      return dataColumn.readLong();
+    case BOOLEAN:
+      return dataColumn.readBoolean() ? 1 : 0;
+    case DOUBLE:
+      return dataColumn.readDouble();
+    case BINARY:
+      return dataColumn.readBytes();
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      return dataColumn.readString();
+    case FLOAT:
+      return dataColumn.readFloat();
+    case DECIMAL:
+      return dataColumn.readDecimal();
+    case TIMESTAMP:
+      return dataColumn.readTimestamp();
+    case INTERVAL_DAY_TIME:
+    default:
+      throw new RuntimeException("Unsupported type in the list: " + type);
     }
   }
 
-  private List decodeDictionaryIds(List valueList) {
+  private List decodeDictionaryIds(PrimitiveObjectInspector.PrimitiveCategory category, List
+      valueList) {
     int total = valueList.size();
     List resultList;
     List<Integer> intList = (List<Integer>) valueList;
-    switch (descriptor.getType()) {
-      case INT32:
-        resultList = new ArrayList<Integer>(total);
-        for (int i = 0; i < total; ++i) {
-          resultList.add(dictionary.decodeToInt(intList.get(i)));
-        }
-        break;
-      case INT64:
-        resultList = new ArrayList<Long>(total);
-        for (int i = 0; i < total; ++i) {
-          resultList.add(dictionary.decodeToLong(intList.get(i)));
-        }
-        break;
-      case FLOAT:
-        resultList = new ArrayList<Float>(total);
-        for (int i = 0; i < total; ++i) {
-          resultList.add(dictionary.decodeToFloat(intList.get(i)));
-        }
-        break;
-      case DOUBLE:
-        resultList = new ArrayList<Double>(total);
-        for (int i = 0; i < total; ++i) {
-          resultList.add(dictionary.decodeToDouble(intList.get(i)));
-        }
-        break;
-      case BINARY:
-      case FIXED_LEN_BYTE_ARRAY:
-        resultList = new ArrayList<byte[]>(total);
-        for (int i = 0; i < total; ++i) {
-          resultList.add(dictionary.decodeToBinary(intList.get(i)).getBytesUnsafe());
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+
+    switch (category) {
+    case INT:
+    case BYTE:
+    case SHORT:
+      resultList = new ArrayList<Integer>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readInteger(intList.get(i)));
+      }
+      break;
+    case DATE:
+    case INTERVAL_YEAR_MONTH:
+    case LONG:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readLong(intList.get(i)));
+      }
+      break;
+    case BOOLEAN:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readBoolean(intList.get(i)) ? 1 : 0);
+      }
+      break;
+    case DOUBLE:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readDouble(intList.get(i)));
+      }
+      break;
+    case BINARY:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readBytes(intList.get(i)));
+      }
+      break;
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readString(intList.get(i)));
+      }
+      break;
+    case FLOAT:
+      resultList = new ArrayList<Float>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readFloat(intList.get(i)));
+      }
+      break;
+    case DECIMAL:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readDecimal(intList.get(i)));
+      }
+      break;
+    case TIMESTAMP:
+      resultList = new ArrayList<Long>(total);
+      for (int i = 0; i < total; ++i) {
+        resultList.add(dictionary.readTimestamp(intList.get(i)));
+      }
+      break;
+    case INTERVAL_DAY_TIME:
+    default:
+      throw new RuntimeException("Unsupported type in the list: " + type);
     }
+
     return resultList;
   }
 
@@ -228,71 +267,79 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
     lcv.offsets = lcvOffset;
   }
 
-  private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory category, ListColumnVector lcv,
-      List valueList, int elementNum) {
+  private void fillColumnVector(PrimitiveObjectInspector.PrimitiveCategory category,
+                                ListColumnVector lcv,
+                                List valueList, int elementNum) {
     int total = valueList.size();
     setChildrenInfo(lcv, total, elementNum);
     switch (category) {
-      case INT:
-      case BYTE:
-      case SHORT:
-      case BOOLEAN:
-        lcv.child = new LongColumnVector(total);
-        for (int i = 0; i < valueList.size(); i++) {
-          ((LongColumnVector)lcv.child).vector[i] = ((List<Integer>)valueList).get(i);
-        }
-        break;
-      case DATE:
-      case INTERVAL_YEAR_MONTH:
-      case LONG:
-        lcv.child = new LongColumnVector(total);
-        for (int i = 0; i < valueList.size(); i++) {
-          ((LongColumnVector)lcv.child).vector[i] = ((List<Long>)valueList).get(i);
-        }
-        break;
-      case DOUBLE:
-        lcv.child = new DoubleColumnVector(total);
-        for (int i = 0; i < valueList.size(); i++) {
-          ((DoubleColumnVector)lcv.child).vector[i] = ((List<Double>)valueList).get(i);
-        }
-        break;
-      case BINARY:
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        lcv.child = new BytesColumnVector(total);
-        lcv.child.init();
-        for (int i = 0; i < valueList.size(); i++) {
-          byte[] src = ((List<byte[]>)valueList).get(i);
-          ((BytesColumnVector)lcv.child).setRef(i, src, 0, src.length);
-        }
-        break;
-      case FLOAT:
-        lcv.child = new DoubleColumnVector(total);
-        for (int i = 0; i < valueList.size(); i++) {
-          ((DoubleColumnVector)lcv.child).vector[i] = ((List<Float>)valueList).get(i);
-        }
-        break;
-      case DECIMAL:
-        int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision();
-        int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
-        lcv.child = new DecimalColumnVector(total, precision, scale);
-        for (int i = 0; i < valueList.size(); i++) {
-          ((DecimalColumnVector)lcv.child).vector[i].set(((List<byte[]>)valueList).get(i), scale);
-        }
-        break;
-      case INTERVAL_DAY_TIME:
-      case TIMESTAMP:
-      default:
-        throw new RuntimeException("Unsupported type in the list: " + type);
+    case INT:
+    case BYTE:
+    case SHORT:
+      lcv.child = new LongColumnVector(total);
+      for (int i = 0; i < valueList.size(); i++) {
+        ((LongColumnVector) lcv.child).vector[i] = ((List<Integer>) valueList).get(i);
+      }
+      break;
+    case BOOLEAN:
+      lcv.child = new LongColumnVector(total);
+      for (int i = 0; i < valueList.size(); i++) {
+        ((LongColumnVector) lcv.child).vector[i] = ((List<Integer>) valueList).get(i);
+      }
+      break;
+    case DATE:
+    case INTERVAL_YEAR_MONTH:
+    case LONG:
+      lcv.child = new LongColumnVector(total);
+      for (int i = 0; i < valueList.size(); i++) {
+        ((LongColumnVector) lcv.child).vector[i] = ((List<Long>) valueList).get(i);
+      }
+      break;
+    case DOUBLE:
+      lcv.child = new DoubleColumnVector(total);
+      for (int i = 0; i < valueList.size(); i++) {
+        ((DoubleColumnVector) lcv.child).vector[i] = ((List<Double>) valueList).get(i);
+      }
+      break;
+    case BINARY:
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      lcv.child = new BytesColumnVector(total);
+      lcv.child.init();
+      for (int i = 0; i < valueList.size(); i++) {
+        byte[] src = ((List<byte[]>) valueList).get(i);
+        ((BytesColumnVector) lcv.child).setRef(i, src, 0, src.length);
+      }
+      break;
+    case FLOAT:
+      lcv.child = new DoubleColumnVector(total);
+      for (int i = 0; i < valueList.size(); i++) {
+        ((DoubleColumnVector) lcv.child).vector[i] = ((List<Float>) valueList).get(i);
+      }
+      break;
+    case DECIMAL:
+      decimalTypeCheck(type);
+      int precision = type.asPrimitiveType().getDecimalMetadata().getPrecision();
+      int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
+      lcv.child = new DecimalColumnVector(total, precision, scale);
+      for (int i = 0; i < valueList.size(); i++) {
+        ((DecimalColumnVector) lcv.child).vector[i].set(((List<byte[]>) valueList).get(i), scale);
+      }
+      break;
+    case INTERVAL_DAY_TIME:
+    case TIMESTAMP:
+    default:
+      throw new RuntimeException("Unsupported type in the list: " + type);
     }
   }
 
   /**
    * Finish the result ListColumnVector with all collected information.
    */
-  private void convertValueListToListColumnVector(PrimitiveObjectInspector.PrimitiveCategory category,
-      ListColumnVector lcv, List valueList, int elementNum) {
+  private void convertValueListToListColumnVector(
+      PrimitiveObjectInspector.PrimitiveCategory category, ListColumnVector lcv, List valueList,
+      int elementNum) {
     // Fill the child of ListColumnVector with valueList
     fillColumnVector(category, lcv, valueList, elementNum);
     setIsRepeating(lcv);
@@ -330,9 +377,10 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
         System.arraycopy(((LongColumnVector) lcv.child).vector, start,
             ((LongColumnVector) resultCV).vector, 0, length);
       } catch (Exception e) {
-        throw new RuntimeException("colinmjj:index:" + index + ", start:" + start + ",length:" + length
-            + ",vec len:" + ((LongColumnVector) lcv.child).vector.length + ", offset len:" + lcv.offsets.length
-            + ", len len:" + lcv.lengths.length, e);
+        throw new RuntimeException(
+            "Fail to copy at index:" + index + ", start:" + start + ",length:" + length + ",vec " +
+                "len:" + ((LongColumnVector) lcv.child).vector.length + ", offset len:" + lcv
+                .offsets.length + ", len len:" + lcv.lengths.length, e);
       }
     }
     if (child instanceof DoubleColumnVector) {
@@ -371,8 +419,9 @@ public class VectorizedListColumnReader extends BaseVectorizedColumnReader {
         if (cv1 instanceof DecimalColumnVector && cv2 instanceof DecimalColumnVector) {
           return compareDecimalColumnVector((DecimalColumnVector) cv1, (DecimalColumnVector) cv2);
         }
-        throw new RuntimeException("Unsupported ColumnVector comparision between " + cv1.getClass().getName()
-            + " and " + cv2.getClass().getName());
+        throw new RuntimeException(
+            "Unsupported ColumnVector comparision between " + cv1.getClass().getName()
+                + " and " + cv2.getClass().getName());
       } else {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 08ac57b..7b77eee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -1,9 +1,13 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -11,9 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hive.ql.io.parquet.vector;
 
-import com.google.common.annotations.VisibleForTesting;
+package org.apache.hadoop.hive.ql.io.parquet.vector;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -64,6 +67,7 @@ import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -459,6 +463,19 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     return res;
   }
 
+  // TODO support only non nested case
+  private PrimitiveType getElementType(Type type) {
+    if (type.isPrimitive()) {
+      return type.asPrimitiveType();
+    }
+    if (type.asGroupType().getFields().size() > 1) {
+      throw new RuntimeException(
+          "Current Parquet Vectorization reader doesn't support nested type");
+    }
+    return type.asGroupType().getFields().get(0).asGroupType().getFields().get(0)
+        .asPrimitiveType();
+  }
+
   // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema
   private VectorizedColumnReader buildVectorizedParquetReader(
     TypeInfo typeInfo,
@@ -474,9 +491,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       if (columnDescriptors == null || columnDescriptors.isEmpty()) {
         throw new RuntimeException(
           "Failed to find related Parquet column descriptor with type " + type);
-      } else {
+      }
+      if (fileSchema.getColumns().contains(descriptors.get(0))) {
         return new VectorizedPrimitiveColumnReader(descriptors.get(0),
-          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
+          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type, typeInfo);
+      } else {
+        // Support for schema evolution
+        return new VectorizedDummyColumnReader();
       }
     case STRUCT:
       StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
@@ -502,8 +523,10 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
         throw new RuntimeException(
             "Failed to find related Parquet column descriptor with type " + type);
       }
+
       return new VectorizedListColumnReader(descriptors.get(0),
-          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
+          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, getElementType(type),
+          typeInfo);
     case MAP:
       if (columnDescriptors == null || columnDescriptors.isEmpty()) {
         throw new RuntimeException(
@@ -535,10 +558,10 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       List<Type> kvTypes = groupType.getFields();
       VectorizedListColumnReader keyListColumnReader = new VectorizedListColumnReader(
           descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion,
-          kvTypes.get(0));
+          kvTypes.get(0), typeInfo);
       VectorizedListColumnReader valueListColumnReader = new VectorizedListColumnReader(
           descriptors.get(1), pages.getPageReader(descriptors.get(1)), skipTimestampConversion,
-          kvTypes.get(1));
+          kvTypes.get(1), typeInfo);
       return new VectorizedMapColumnReader(keyListColumnReader, valueListColumnReader);
     case UNION:
     default:

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
index 39689f1..1442d69 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -19,17 +19,13 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.schema.Type;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.sql.Timestamp;
 
 /**
  * It's column level Parquet reader which is used to read a batch of records for a column,
@@ -38,18 +34,18 @@ import java.sql.Timestamp;
 public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader {
 
   public VectorizedPrimitiveColumnReader(
-    ColumnDescriptor descriptor,
-    PageReader pageReader,
-    boolean skipTimestampConversion,
-    Type type) throws IOException {
-    super(descriptor, pageReader, skipTimestampConversion, type);
+      ColumnDescriptor descriptor,
+      PageReader pageReader,
+      boolean skipTimestampConversion,
+      Type type, TypeInfo hiveType) throws IOException {
+    super(descriptor, pageReader, skipTimestampConversion, type, hiveType);
   }
 
   @Override
   public void readBatch(
-    int total,
-    ColumnVector column,
-    TypeInfo columnType) throws IOException {
+      int total,
+      ColumnVector column,
+      TypeInfo columnType) throws IOException {
     int rowId = 0;
     while (total > 0) {
       // Compute the number of values we want to read in this page.
@@ -64,7 +60,7 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
         LongColumnVector dictionaryIds = new LongColumnVector();
         // Read and decode dictionary ids.
         readDictionaryIDs(num, dictionaryIds, rowId);
-        decodeDictionaryIds(rowId, num, column, dictionaryIds);
+        decodeDictionaryIds(rowId, num, column, columnType, dictionaryIds);
       } else {
         // assign values in vector
         readBatchHelper(num, column, columnType, rowId);
@@ -75,10 +71,10 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readBatchHelper(
-    int num,
-    ColumnVector column,
-    TypeInfo columnType,
-    int rowId) throws IOException {
+      int num,
+      ColumnVector column,
+      TypeInfo columnType,
+      int rowId) throws IOException {
     PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
 
     switch (primitiveColumnType.getPrimitiveCategory()) {
@@ -99,10 +95,16 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
       readDoubles(num, (DoubleColumnVector) column, rowId);
       break;
     case BINARY:
+      readBinaries(num, (BytesColumnVector) column, rowId);
+      break;
     case STRING:
-    case CHAR:
+      readString(num, (BytesColumnVector) column, rowId);
+      break;
     case VARCHAR:
-      readBinaries(num, (BytesColumnVector) column, rowId);
+      readVarchar(num, (BytesColumnVector) column, rowId);
+      break;
+    case CHAR:
+      readChar(num, (BytesColumnVector) column, rowId);
       break;
     case FLOAT:
       readFloats(num, (DoubleColumnVector) column, rowId);
@@ -120,9 +122,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readDictionaryIDs(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      LongColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
@@ -141,9 +143,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readIntegers(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      LongColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
@@ -162,9 +164,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readDoubles(
-    int total,
-    DoubleColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      DoubleColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
@@ -183,9 +185,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readBooleans(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      LongColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
@@ -204,9 +206,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readLongs(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      LongColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
@@ -225,9 +227,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readFloats(
-    int total,
-    DoubleColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      DoubleColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
@@ -246,16 +248,17 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
   }
 
   private void readDecimal(
-    int total,
-    DecimalColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      DecimalColumnVector c,
+      int rowId) throws IOException {
+    decimalTypeCheck(type);
     int left = total;
     c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
     c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
       if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
+        c.vector[rowId].set(dataColumn.readDecimal(), c.scale);
         c.isNull[rowId] = false;
         c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
       } else {
@@ -268,15 +271,81 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
     }
   }
 
+  private void readString(
+      int total,
+      BytesColumnVector c,
+      int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.setVal(rowId, dataColumn.readString());
+        c.isNull[rowId] = false;
+        // TODO figure out a better way to set repeat for Binary type
+        c.isRepeating = false;
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readChar(
+      int total,
+      BytesColumnVector c,
+      int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.setVal(rowId, dataColumn.readChar());
+        c.isNull[rowId] = false;
+        // TODO figure out a better way to set repeat for Binary type
+        c.isRepeating = false;
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readVarchar(
+      int total,
+      BytesColumnVector c,
+      int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.setVal(rowId, dataColumn.readVarchar());
+        c.isNull[rowId] = false;
+        // TODO figure out a better way to set repeat for Binary type
+        c.isRepeating = false;
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
   private void readBinaries(
-    int total,
-    BytesColumnVector c,
-    int rowId) throws IOException {
+      int total,
+      BytesColumnVector c,
+      int rowId) throws IOException {
     int left = total;
     while (left > 0) {
       readRepetitionAndDefinitionLevels();
       if (definitionLevel >= maxDefLevel) {
-        c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
+        c.setVal(rowId, dataColumn.readBytes());
         c.isNull[rowId] = false;
         // TODO figure out a better way to set repeat for Binary type
         c.isRepeating = false;
@@ -296,11 +365,9 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
       readRepetitionAndDefinitionLevels();
       if (definitionLevel >= maxDefLevel) {
         switch (descriptor.getType()) {
-          //INT64 is not yet supported
+        //INT64 is not yet supported
         case INT96:
-          NanoTime nt = NanoTime.fromBinary(dataColumn.readBytes());
-          Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
-          c.set(rowId, ts);
+          c.set(rowId, dataColumn.readTimestamp());
           break;
         default:
           throw new IOException(
@@ -323,73 +390,99 @@ public class VectorizedPrimitiveColumnReader extends BaseVectorizedColumnReader
    * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
    */
   private void decodeDictionaryIds(
-    int rowId,
-    int num,
-    ColumnVector column,
-    LongColumnVector dictionaryIds) {
+      int rowId,
+      int num,
+      ColumnVector column,
+      TypeInfo columnType,
+      LongColumnVector dictionaryIds) {
     System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
     if (column.noNulls) {
       column.noNulls = dictionaryIds.noNulls;
     }
     column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
 
-    switch (descriptor.getType()) {
-    case INT32:
+
+    PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+
+    switch (primitiveColumnType.getPrimitiveCategory()) {
+    case INT:
+    case BYTE:
+    case SHORT:
       for (int i = rowId; i < rowId + num; ++i) {
         ((LongColumnVector) column).vector[i] =
-          dictionary.decodeToInt((int) dictionaryIds.vector[i]);
+            dictionary.readInteger((int) dictionaryIds.vector[i]);
       }
       break;
-    case INT64:
+    case DATE:
+    case INTERVAL_YEAR_MONTH:
+    case LONG:
       for (int i = rowId; i < rowId + num; ++i) {
         ((LongColumnVector) column).vector[i] =
-          dictionary.decodeToLong((int) dictionaryIds.vector[i]);
+            dictionary.readLong((int) dictionaryIds.vector[i]);
       }
       break;
-    case FLOAT:
+    case BOOLEAN:
       for (int i = rowId; i < rowId + num; ++i) {
-        ((DoubleColumnVector) column).vector[i] =
-          dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
+        ((LongColumnVector) column).vector[i] =
+            dictionary.readBoolean((int) dictionaryIds.vector[i]) ? 1 : 0;
       }
       break;
     case DOUBLE:
       for (int i = rowId; i < rowId + num; ++i) {
         ((DoubleColumnVector) column).vector[i] =
-          dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
+            dictionary.readDouble((int) dictionaryIds.vector[i]);
       }
       break;
-    case INT96:
+    case BINARY:
       for (int i = rowId; i < rowId + num; ++i) {
-        ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
-        buf.order(ByteOrder.LITTLE_ENDIAN);
-        long timeOfDayNanos = buf.getLong();
-        int julianDay = buf.getInt();
-        NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
-        Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
-        ((TimestampColumnVector) column).set(i, ts);
+        ((BytesColumnVector) column)
+            .setVal(i, dictionary.readBytes((int) dictionaryIds.vector[i]));
       }
       break;
-    case BINARY:
-    case FIXED_LEN_BYTE_ARRAY:
-      if (column instanceof BytesColumnVector) {
-        for (int i = rowId; i < rowId + num; ++i) {
-          ((BytesColumnVector) column)
-            .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
-        }
-      } else {
-        DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
-        decimalColumnVector.precision =
-          (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
-        decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
-        for (int i = rowId; i < rowId + num; ++i) {
-          decimalColumnVector.vector[i]
-            .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(),
-              decimalColumnVector.scale);
-        }
+    case STRING:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((BytesColumnVector) column)
+            .setVal(i, dictionary.readString((int) dictionaryIds.vector[i]));
+      }
+      break;
+    case VARCHAR:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((BytesColumnVector) column)
+            .setVal(i, dictionary.readVarchar((int) dictionaryIds.vector[i]));
       }
       break;
+    case CHAR:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((BytesColumnVector) column)
+            .setVal(i, dictionary.readChar((int) dictionaryIds.vector[i]));
+      }
+      break;
+    case FLOAT:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((DoubleColumnVector) column).vector[i] =
+            dictionary.readFloat((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case DECIMAL:
+      decimalTypeCheck(type);
+      DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
+      decimalColumnVector.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+      decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+      for (int i = rowId; i < rowId + num; ++i) {
+        decimalColumnVector.vector[i]
+            .set(dictionary.readDecimal((int) dictionaryIds.vector[i]),
+                decimalColumnVector.scale);
+      }
+      break;
+    case TIMESTAMP:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((TimestampColumnVector) column)
+            .set(i, dictionary.readTimestamp((int) dictionaryIds.vector[i]));
+      }
+      break;
+    case INTERVAL_DAY_TIME:
     default:
-      throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+      throw new UnsupportedOperationException("Unsupported type: " + type);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java
new file mode 100644
index 0000000..b695974
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Hive Parquet Vectorized Reader related.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index 9e414dc..52e6045 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -35,7 +35,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-import static junit.framework.TestCase.assertFalse;
 import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
 
 public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
@@ -55,26 +54,40 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
   @Test
   public void testIntRead() throws Exception {
     intRead(isDictionaryEncoding);
+    longReadInt(isDictionaryEncoding);
+    floatReadInt(isDictionaryEncoding);
+    doubleReadInt(isDictionaryEncoding);
   }
 
   @Test
   public void testLongRead() throws Exception {
     longRead(isDictionaryEncoding);
+    floatReadLong(isDictionaryEncoding);
+    doubleReadLong(isDictionaryEncoding);
+  }
+
+  @Test
+  public void testTimestamp() throws Exception {
+    timestampRead(isDictionaryEncoding);
+    stringReadTimestamp(isDictionaryEncoding);
   }
 
   @Test
   public void testDoubleRead() throws Exception {
     doubleRead(isDictionaryEncoding);
+    stringReadDouble(isDictionaryEncoding);
   }
 
   @Test
   public void testFloatRead() throws Exception {
     floatRead(isDictionaryEncoding);
+    doubleReadFloat(isDictionaryEncoding);
   }
 
   @Test
   public void testBooleanRead() throws Exception {
     booleanRead();
+    stringReadBoolean();
   }
 
   @Test
@@ -101,6 +114,7 @@ public class TestVectorizedColumnReader extends VectorizedColumnReaderTestBase {
   @Test
   public void decimalRead() throws Exception {
     decimalRead(isDictionaryEncoding);
+    stringReadDecimal(isDictionaryEncoding);
   }
 
   private class TestVectorizedParquetRecordReader extends VectorizedParquetRecordReader {

http://git-wip-us.apache.org/repos/asf/hive/blob/7ddac02b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
index 3e5d831..32d27d9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java
@@ -41,21 +41,34 @@ public class TestVectorizedDictionaryEncodingColumnReader extends VectorizedColu
   @Test
   public void testIntRead() throws Exception {
     intRead(isDictionaryEncoding);
+    longReadInt(isDictionaryEncoding);
+    floatReadInt(isDictionaryEncoding);
+    doubleReadInt(isDictionaryEncoding);
   }
 
   @Test
   public void testLongRead() throws Exception {
     longRead(isDictionaryEncoding);
+    floatReadLong(isDictionaryEncoding);
+    doubleReadLong(isDictionaryEncoding);
+  }
+
+  @Test
+  public void testTimestamp() throws Exception {
+    timestampRead(isDictionaryEncoding);
+    stringReadTimestamp(isDictionaryEncoding);
   }
 
   @Test
   public void testDoubleRead() throws Exception {
     doubleRead(isDictionaryEncoding);
+    stringReadDouble(isDictionaryEncoding);
   }
 
   @Test
   public void testFloatRead() throws Exception {
     floatRead(isDictionaryEncoding);
+    doubleReadFloat(isDictionaryEncoding);
   }
 
   @Test
@@ -81,5 +94,6 @@ public class TestVectorizedDictionaryEncodingColumnReader extends VectorizedColu
   @Test
   public void decimalRead() throws Exception {
     decimalRead(isDictionaryEncoding);
+    stringReadDecimal(isDictionaryEncoding);
   }
 }