You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/09/28 20:36:22 UTC

[3/3] drill git commit: DRILL-2908:Enable reading the Int 96 type from parquet files.

DRILL-2908:Enable reading the Int 96 type from parquet files.

column chunk metadata can be out of order from the column ordering in the schema, even though it exposes both as a list, making them seem like they should correspond, so we have to make our own map between the column names and indexes in the list.

Support for varbinary reading and int96 reading in the new reader.

Support the second version page header, the java library will only dictionary encode fix length byte arrays when the writer version is set to 2.0

Looks to be working in the vectorized reader, need a test case.

Fixed complex reader, was using the wrong field to figure out the length to read.

Conflicts:
	exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
	exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
	exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
	exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java

UDF for reading impala timestamps from varbinary

Fix for reading fixed binary and int96 columns in the vectorized parquet reader.

Conflicts:
	exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java

Fix for a bug reading fixed binary and int 96 data out of parquet when the data is plain encoded.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/657fe5bb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/657fe5bb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/657fe5bb

Branch: refs/heads/master
Commit: 657fe5bbd642a4483eec73d1da3f25b3d3656e34
Parents: b9afcf8
Author: Jason Altekruse <al...@gmail.com>
Authored: Tue Apr 14 16:27:59 2015 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Sep 28 11:35:28 2015 -0700

----------------------------------------------------------------------
 .../codegen/templates/NullableValueVectors.java |  1 +
 .../impl/conv/ConvertFromImpalaTimestamp.java   | 50 ++++++++++++++++++++
 .../columnreaders/ColumnReaderFactory.java      |  7 ++-
 .../columnreaders/FixedByteAlignedReader.java   |  2 +-
 .../NullableFixedByteAlignedReaders.java        | 41 +++++++++++++++-
 .../ParquetFixedWidthDictionaryReaders.java     | 43 +++++++++++++++++
 .../columnreaders/ParquetRecordReader.java      | 15 +++++-
 .../ParquetToDrillTypeConverter.java            |  2 +-
 .../parquet2/DrillParquetGroupConverter.java    | 38 ++++++++++++---
 .../parquet/hadoop/ColumnChunkIncReadStore.java | 39 ++++++++++++++-
 10 files changed, 226 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 85a87f6..454de7c 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -467,6 +467,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     @Override
     public void setValueLengthSafe(int index, int length) {
       values.getMutator().setValueLengthSafe(index, length);
+      lastSet = index;
     }
     </#if>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
new file mode 100644
index 0000000..e91c08e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.expr.fn.impl.conv;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+
+public class ConvertFromImpalaTimestamp {
+
+  @FunctionTemplate(name = "convert_fromIMPALA_TIMESTAMP", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class ImpalaTimestampConvertFrom implements DrillSimpleFunc {
+
+    @Param VarBinaryHolder in;
+    @Output TimeStampHolder out;
+
+    @Override
+    public void setup() { }
+
+    @Override
+    public void eval() {
+      org.apache.drill.exec.util.ByteBufUtil.checkBufferLength(in.buffer, in.start, in.end, 12);
+
+      in.buffer.readerIndex(in.start);
+      long nanosOfDay = in.buffer.readLong();
+      int julianDay = in.buffer.readInt();
+      // Need to subtract half of a day because julian days are recorded as starting at noon
+      out.value = org.joda.time.DateTimeUtils.fromJulianDay(julianDay - 0.5 + nanosOfDay / (24.0 * 3600 * 1000000000));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 650163e..5292678 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -74,7 +74,8 @@ public class ColumnReaderFactory {
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
         return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
             fixedLength, v, schemaElement);
-      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ) {
+      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ||
+          columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT96) {
         if (convertedType == ConvertedType.DECIMAL){
           int length = schemaElement.type_length;
           if (length <= 12) {
@@ -121,6 +122,8 @@ public class ColumnReaderFactory {
               return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
             case DOUBLE:
               return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
+            case FIXED_LEN_BYTE_ARRAY:
+              return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
             default:
               throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() );
           }
@@ -235,6 +238,8 @@ public class ColumnReaderFactory {
             default:
               throw new ExecutionSetupException("Unsupported nullable converted type " + convertedType + " for primitive type INT64");
           }
+        case INT96:
+           return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement);
         case FLOAT:
           return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
         case DOUBLE:

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 07b78e6..ef2ae1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -89,7 +89,7 @@ class FixedByteAlignedReader extends ColumnReader {
       // now we need to write the lengths of each value
       int byteLength = dataTypeLengthInBits / 8;
       for (int i = 0; i < recordsToReadInThisPass; i++) {
-        castedVector.getMutator().setValueLengthSafe(i, byteLength);
+        castedVector.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index e2388ac..4d15ec6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DecimalUtility;
-import org.apache.drill.exec.vector.IntervalVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableDateVector;
 import org.apache.drill.exec.vector.NullableDecimal18Vector;
@@ -40,6 +39,7 @@ import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTimeUtils;
 
@@ -68,6 +68,45 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  /**
+   * Class for reading the fixed length byte array type in parquet. Currently Drill does not have
+   * a fixed length binary type, so this is read into a varbinary with the same size recorded for
+   * each value.
+   */
+  static class NullableFixedBinaryReader extends NullableFixedByteAlignedReader {
+
+    NullableVarBinaryVector castedVector;
+
+    NullableFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      castedVector = v;
+    }
+
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      this.bytebuf = pageReader.pageData;
+
+      if (usingDictionary) {
+        NullableVarBinaryVector.Mutator mutator =  castedVector.getMutator();
+        Binary currDictValToWrite;
+        for (int i = 0; i < recordsReadInThisIteration; i++){
+          currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
+          mutator.setSafe(valuesReadInCurrentPass + i, currDictValToWrite.toByteBuffer(), 0,
+              currDictValToWrite.length());
+        }
+      } else {
+        super.readField(recordsToReadInThisPass);
+        // TODO - replace this with fixed binary type in drill
+        // for now we need to write the lengths of each value
+        int byteLength = dataTypeLengthInBits / 8;
+        for (int i = 0; i < recordsToReadInThisPass; i++) {
+          castedVector.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength);
+        }
+      }
+    }
+  }
+
   static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
 
     NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index b3893c1..ea75cad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -17,6 +17,7 @@
  ******************************************************************************/
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.Decimal18Vector;
@@ -24,12 +25,15 @@ import org.apache.drill.exec.vector.Decimal9Vector;
 import org.apache.drill.exec.vector.Float4Vector;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.TimeStampVector;
 import org.apache.drill.exec.vector.TimeVector;
 
+import org.apache.drill.exec.vector.VarBinaryVector;
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
 
 public class ParquetFixedWidthDictionaryReaders {
 
@@ -59,6 +63,45 @@ public class ParquetFixedWidthDictionaryReaders {
     }
   }
 
+  static class DictionaryFixedBinaryReader extends FixedByteAlignedReader {
+
+    VarBinaryVector castedVector;
+
+    DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                        ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
+                        SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+          - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      if (usingDictionary) {
+        VarBinaryVector.Mutator mutator =  castedVector.getMutator();
+        Binary currDictValToWrite = null;
+        for (int i = 0; i < recordsReadInThisIteration; i++){
+          currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
+          mutator.setSafe(i, currDictValToWrite.toByteBuffer(), 0,
+              currDictValToWrite.length());
+        }
+      } else {
+        super.readField(recordsToReadInThisPass);
+      }
+
+      // TODO - replace this with fixed binary type in drill
+      // now we need to write the lengths of each value
+      int byteLength = dataTypeLengthInBits / 8;
+      for (int i = 0; i < recordsToReadInThisPass; i++) {
+        castedVector.getMutator().setValueLengthSafe(i, byteLength);
+      }
+    }
+  }
+
   static class DictionaryDecimal9Reader extends FixedByteAlignedReader {
 
     Decimal9Vector castedVector;

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index a4f5cac..3a00a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +52,7 @@ import parquet.format.FileMetaData;
 import parquet.format.SchemaElement;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.PrimitiveType;
@@ -273,9 +275,20 @@ public class ParquetRecordReader extends AbstractRecordReader {
       final ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
       // initialize all of the column read status objects
       boolean fieldFixedLength;
+      // the column chunk meta-data is not guaranteed to be in the same order as the columns in the schema
+      // a map is constructed for fast access to the correct columnChunkMetadata to correspond
+      // to an element in the schema
+      Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap();
+      BlockMetaData rowGroupMetadata = footer.getBlocks().get(rowGroupIndex);
+
+      int colChunkIndex = 0;
+      for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
+        columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()), colChunkIndex);
+        colChunkIndex++;
+      }
       for (int i = 0; i < columns.size(); ++i) {
         column = columns.get(i);
-        columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
+        columnChunkMetaData = rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
         schemaElement = schemaElements.get(column.getPath()[0]);
         MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(),
             getDataMode(column), schemaElement, fragmentContext.getOptions());

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index a120f57..a33e616 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -98,7 +98,7 @@ public class ParquetToDrillTypeConverter {
       // TODO - Both of these are not supported by the parquet library yet (7/3/13),
       // but they are declared here for when they are implemented
       case INT96:
-        return TypeProtos.MinorType.FIXEDBINARY;
+        return TypeProtos.MinorType.VARBINARY;
       case FIXED_LEN_BYTE_ARRAY:
         if (convertedType == null) {
           checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 825dc54..6b8154a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -44,10 +43,10 @@ import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
@@ -66,7 +65,6 @@ import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
 import org.apache.drill.exec.vector.complex.writer.TimeWriter;
 import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
 import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
-import org.apache.drill.exec.work.ExecErrorConstants;
 import org.joda.time.DateTimeUtils;
 
 import parquet.io.api.Binary;
@@ -206,6 +204,13 @@ public class DrillParquetGroupConverter extends GroupConverter {
           }
         }
       }
+      case INT96: {
+        if (type.getOriginalType() == null) {
+          VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
+          return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer());
+        }
+
+      }
       case FLOAT: {
         Float4Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float4() : mapWriter.float4(name);
         return new DrillFloat4Converter(writer);
@@ -261,9 +266,9 @@ public class DrillParquetGroupConverter extends GroupConverter {
               : mapWriter.interval(name);
           return new DrillFixedLengthByteArrayToInterval(writer);
 
-        }
-        else {
-          throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType());
+        } else {
+          VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
+          return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer());
         }
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName());
@@ -541,4 +546,25 @@ public class DrillParquetGroupConverter extends GroupConverter {
       writer.write(holder);
     }
   }
+  /**
+   * Parquet currently supports a fixed binary type, which is not implemented in Drill. For now this
+   * data will be read in a s varbinary and the same length will be recorded for each value.
+   */
+  public static class DrillFixedBinaryToVarbinaryConverter extends PrimitiveConverter {
+    private VarBinaryWriter writer;
+    private VarBinaryHolder holder = new VarBinaryHolder();
+
+    public DrillFixedBinaryToVarbinaryConverter(VarBinaryWriter writer, int length, DrillBuf buf) {
+      this.writer = writer;
+      holder.buffer = buf = buf.reallocIfNeeded(length);
+      holder.start = 0;
+      holder.end = length;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      holder.buffer.setBytes(0, value.toByteBuffer());
+      writer.write(holder);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/657fe5bb/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index d8bf2fd..4559083 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -38,9 +38,11 @@ import parquet.bytes.BytesInput;
 import parquet.column.ColumnDescriptor;
 import parquet.column.page.DataPage;
 import parquet.column.page.DataPageV1;
+import parquet.column.page.DataPageV2;
 import parquet.column.page.DictionaryPage;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.PageReader;
+import parquet.format.DataPageHeaderV2;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
@@ -48,6 +50,8 @@ import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.util.CompatibilityUtil;
 
+import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+
 
 public class ColumnChunkIncReadStore implements PageReadStore {
 
@@ -141,6 +145,8 @@ public class ColumnChunkIncReadStore implements PageReadStore {
         }
         while(valueReadSoFar < metaData.getValueCount()) {
           pageHeader = Util.readPageHeader(in);
+          int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+          int compressedPageSize = pageHeader.getCompressed_page_size();
           switch (pageHeader.type) {
             case DICTIONARY_PAGE:
               if (dictionaryPage == null) {
@@ -166,11 +172,42 @@ public class ColumnChunkIncReadStore implements PageReadStore {
                       decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
                       pageHeader.data_page_header.num_values,
                       pageHeader.uncompressed_page_size,
-                      ParquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+                      fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
                       parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
                       parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
                       parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
               );
+            // TODO - finish testing this with more files
+            case DATA_PAGE_V2:
+              valueReadSoFar += pageHeader.data_page_header_v2.getNum_values();
+              buf = allocator.buffer(pageHeader.compressed_page_size);
+              lastPage = buf;
+              buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
+              while (buffer.remaining() > 0) {
+                CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size);
+              }
+              DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
+              int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
+              BytesInput decompressedPageData =
+                  decompressor.decompress(
+                      BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
+                      pageHeader.uncompressed_page_size);
+              return new DataPageV2(
+                      dataHeaderV2.getNum_rows(),
+                      dataHeaderV2.getNum_nulls(),
+                      dataHeaderV2.getNum_values(),
+                      BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
+                      BytesInput.from(decompressedPageData.toByteBuffer(),
+                          dataHeaderV2.getRepetition_levels_byte_length(),
+                          dataHeaderV2.getDefinition_levels_byte_length()),
+                      parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
+                      BytesInput.from(decompressedPageData.toByteBuffer(),
+                          dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
+                          dataSize),
+                      uncompressedPageSize,
+                      fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
+                      dataHeaderV2.isIs_compressed()
+                  );
             default:
               in.skip(pageHeader.compressed_page_size);
               break;