You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/06 23:42:08 UTC

[08/15] git commit: Drill-400 change parquet reader to place varbinary fields into VarCharVectors, allowing them to be returned by default as UTF-8 Strings. Note that this will only be the case with newer parquet files that were produced after Converted

Drill-400 change parquet reader to place varbinary fields into VarCharVectors, allowing them to be returned by default as UTF-8 Strings. Note that this will only be the case with newer parquet files that were produced after Converted Types were added to the format. This metadata stores the desired intrepertation of a column, but was not originally in the format. For older files you will still need to case binary data to Varchar.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f071aca7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f071aca7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f071aca7

Branch: refs/heads/master
Commit: f071aca7f8c85eace6f96d931068bceabdb2c419
Parents: a2355d4
Author: Jason Altekruse <al...@gmail.com>
Authored: Tue Apr 22 21:31:14 2014 -0500
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Mon May 5 18:51:02 2014 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   4 +-
 .../drill/exec/store/parquet/BitReader.java     |   9 +-
 .../drill/exec/store/parquet/ColumnReader.java  |  20 ++-
 .../store/parquet/FixedByteAlignedReader.java   |   5 +-
 .../exec/store/parquet/NullableBitReader.java   |   7 +-
 .../store/parquet/NullableColumnReader.java     |  19 +-
 .../parquet/NullableFixedByteAlignedReader.java |   5 +-
 .../exec/store/parquet/PageReadStatus.java      |   6 +-
 .../exec/store/parquet/ParquetRecordReader.java | 106 ++++++++---
 .../exec/store/parquet/VarLenBinaryReader.java  | 176 ++++++++++++++++---
 .../store/parquet/ParquetRecordReaderTest.java  |  21 ++-
 .../store/parquet/ParquetResultListener.java    |  31 +---
 12 files changed, 305 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 196b095..3e26662 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -95,7 +95,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.2.8</version>
+      <version>1.4.0</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
@@ -110,7 +110,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>1.2.8</version>
+      <version>1.4.0</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
index c489d5b..c323222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 final class BitReader extends ColumnReader {
@@ -30,8 +31,8 @@ final class BitReader extends ColumnReader {
   private byte[] bytes;
   
   BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-            boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+            boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
   }
 
   @Override
@@ -47,11 +48,11 @@ final class BitReader extends ColumnReader {
     bytes = pageReadStatus.pageDataByteArray;
     // standard read, using memory mapping
     if (pageReadStatus.bitShift == 0) {
-      ((BaseDataValueVector) valueVecHolder.getValueVector()).getData().writeBytes(bytes,
+      ((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
           (int) readStartInBytes, (int) readLength);
     } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
 
-      vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
+      vectorData = ((BaseDataValueVector) valueVec).getData();
       nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
       readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index d5c88ef..97ecfb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -24,17 +24,18 @@ import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FSDataInputStream;
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.schema.PrimitiveType;
 
 import java.io.IOException;
 
-abstract class ColumnReader {
+abstract class ColumnReader<V extends ValueVector> {
   
   final ParquetRecordReader parentReader;
   
   // Value Vector for this column
-  final VectorHolder valueVecHolder;
+  final V valueVec;
   // column description from the parquet library
   final ColumnDescriptor columnDescriptor;
   // metadata of the column, from the parquet library
@@ -42,6 +43,8 @@ abstract class ColumnReader {
   // status information on the current page
   final PageReadStatus pageReadStatus;
 
+  final ConvertedType convertedType;
+
   // quick reference to see if the field is fixed length (as this requires an instanceof)
   final boolean isFixedLength;
 
@@ -62,16 +65,17 @@ abstract class ColumnReader {
   long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
 
   protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
+      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, ConvertedType convertedType) throws ExecutionSetupException {
     this.parentReader = parentReader;
     this.columnDescriptor = descriptor;
     this.columnChunkMetaData = columnChunkMetaData;
     this.isFixedLength = fixedLength;
+    this.convertedType = convertedType;
 
     if (allocateSize > 1) {
-      valueVecHolder = new VectorHolder(allocateSize, v);
+      valueVec =  v;
     } else {
-      valueVecHolder = new VectorHolder(5000, v);
+      valueVec =  v;
     }
 
 
@@ -88,7 +92,7 @@ abstract class ColumnReader {
     readLength = 0;
     readLengthInBits = 0;
     recordsReadInThisIteration = 0;
-    vectorData = ((BaseValueVector) valueVecHolder.getValueVector()).getData();
+    vectorData = ((BaseValueVector) valueVec).getData();
     do {
       // if no page has been read, or all of the records have been read out of a page, read the next one
       if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
@@ -108,11 +112,11 @@ abstract class ColumnReader {
         pageReadStatus.readPosInBytes = readStartInBytes + readLength;
       }
     } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
-    valueVecHolder.getValueVector().getMutator().setValueCount(valuesReadInCurrentPass);
+    valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
   }
 
   public void clear() {
-    this.valueVecHolder.reset();
+    valueVec.clear();
     this.pageReadStatus.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
index 4f14f60..0aa18cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 class FixedByteAlignedReader extends ColumnReader {
@@ -29,8 +30,8 @@ class FixedByteAlignedReader extends ColumnReader {
 
   
   FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                         boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+                         boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
   }
 
   // this method is called by its superclass during a read loop

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
index 16c2715..22933ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 import java.io.IOException;
@@ -38,8 +39,8 @@ import java.io.IOException;
 final class NullableBitReader extends ColumnReader {
 
   NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-                    boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+                    boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
   }
 
   @Override
@@ -52,7 +53,7 @@ final class NullableBitReader extends ColumnReader {
       defLevel = pageReadStatus.definitionLevels.readInteger();
       // if the value is defined
       if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
-        if (!((NullableBitVector)valueVecHolder.getValueVector()).getMutator().setSafe(i + valuesReadInCurrentPass,
+        if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
             pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) {
           throw new RuntimeException();
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index b6ae715..8faf756 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 import java.io.IOException;
@@ -35,8 +36,8 @@ abstract class NullableColumnReader extends ColumnReader{
   int bitsUsed;
 
   NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
-               boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+               boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
   }
 
   public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
@@ -44,7 +45,7 @@ abstract class NullableColumnReader extends ColumnReader{
     readLength = 0;
     readLengthInBits = 0;
     recordsReadInThisIteration = 0;
-    vectorData = ((BaseValueVector)valueVecHolder.getValueVector()).getData();
+    vectorData = ((BaseValueVector)valueVec).getData();
 
     do {
       // if no page has been read, or all of the records have been read out of a page, read the next one
@@ -72,11 +73,11 @@ abstract class NullableColumnReader extends ColumnReader{
         lastValueWasNull = true;
         nullsFound = 0;
         if (currentValueIndexInVector - totalValuesRead == recordsToReadInThisPass
-            || currentValueIndexInVector >= valueVecHolder.getValueVector().getValueCapacity()){
+            || currentValueIndexInVector >= valueVec.getValueCapacity()){
           break;
         }
         while(currentValueIndexInVector - totalValuesRead < recordsToReadInThisPass
-            && currentValueIndexInVector < valueVecHolder.getValueVector().getValueCapacity()
+            && currentValueIndexInVector < valueVec.getValueCapacity()
             && pageReadStatus.valuesRead + definitionLevelsRead < pageReadStatus.currentPage.getValueCount()){
           currentDefinitionLevel = pageReadStatus.definitionLevels.readInteger();
           definitionLevelsRead++;
@@ -96,7 +97,7 @@ abstract class NullableColumnReader extends ColumnReader{
               lastValueWasNull = false;
             }
             runLength++;
-            ((NullableVectorDefinitionSetter)valueVecHolder.getValueVector().getMutator()).setIndexDefined(currentValueIndexInVector);
+            ((NullableVectorDefinitionSetter)valueVec.getMutator()).setIndexDefined(currentValueIndexInVector);
           }
           currentValueIndexInVector++;
         }
@@ -104,9 +105,9 @@ abstract class NullableColumnReader extends ColumnReader{
         recordsReadInThisIteration = runLength;
 
         readField( runLength, firstColumnStatus);
-        int writerIndex = ((BaseValueVector) valueVecHolder.getValueVector()).getData().writerIndex();
+        int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
         if ( dataTypeLengthInBits > 8  || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
-          ((BaseValueVector) valueVecHolder.getValueVector()).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
+          ((BaseValueVector) valueVec).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
         }
         else if (dataTypeLengthInBits < 8){
           rightBitShift += dataTypeLengthInBits * nullsFound;
@@ -125,7 +126,7 @@ abstract class NullableColumnReader extends ColumnReader{
       }
     }
     while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
-    valueVecHolder.getValueVector().getMutator().setValueCount(
+    valueVec.getMutator().setValueCount(
         valuesReadInCurrentPass);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
index c2fc606..038f2d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
 
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 class NullableFixedByteAlignedReader extends NullableColumnReader {
@@ -28,8 +29,8 @@ class NullableFixedByteAlignedReader extends NullableColumnReader {
   private byte[] bytes;
 
   NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
-      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+      ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException {
+    super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
   }
 
   // this method is called by its superclass during a read loop

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 67262f6..fe83159 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -115,9 +115,9 @@ final class PageReadStatus {
     if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
       definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
       valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-      int endOfDefinitionLevels = definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
-      valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, endOfDefinitionLevels);
-      readPosInBytes = endOfDefinitionLevels;
+      definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
+      readPosInBytes = definitionLevels.getNextOffset();
+      valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 9acb557..463f3ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -38,12 +39,21 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.schema.PrimitiveType;
@@ -173,11 +183,23 @@ class ParquetRecordReader implements RecordReader {
     int columnsToScan = 0;
 
     MaterializedField field;
+    ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+    FileMetaData fileMetaData;
+
+    // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
+    // store a map from column name to converted types if they are non-null
+    HashMap<String, ConvertedType> convertedTypes = new HashMap<>();
+    fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
+    for (SchemaElement se : fileMetaData.getSchema()) {
+      convertedTypes.put(se.getName(), se.getConverted_type());
+    }
+
     // loop to add up the length of the fixed width columns and build the schema
     for (int i = 0; i < columns.size(); ++i) {
       column = columns.get(i);
+      logger.debug("name: " + fileMetaData.getSchema().get(i).name);
       field = MaterializedField.create(toFieldName(column.getPath()),
-          toMajorType(column.getType(), getDataMode(column)));
+          toMajorType(column.getType(), getDataMode(column), convertedTypes.get(column.getPath()[0])));
       if ( ! fieldSelected(field)){
         continue;
       }
@@ -203,9 +225,22 @@ class ParquetRecordReader implements RecordReader {
       return;
     }
     if (allFieldsFixedLength) {
-      recordsPerBatch = (int) Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount());
+      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
+          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
+    }
+    else {
+      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
     }
+//    for (SchemaElement se : fileMetaData.getSchema()) {
+//      if (fieldSelected())
+//      System.out.println("convertedtype :" + se.getConverted_type());
+//      System.out.println("name:" + se.getName());
+//      System.out.println();
+//
+//    }
     try {
+      ValueVector v;
+      ConvertedType convertedType;
       ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new ArrayList<>();
       ArrayList<VarLenBinaryReader.NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>();
       // initialize all of the column read status objects
@@ -213,21 +248,38 @@ class ParquetRecordReader implements RecordReader {
       for (int i = 0; i < columns.size(); ++i) {
         column = columns.get(i);
         columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
-        MajorType type = toMajorType(column.getType(), getDataMode(column));
+        convertedType = convertedTypes.get(column.getPath()[0]);
+        MajorType type = toMajorType(column.getType(), getDataMode(column), convertedType);
         field = MaterializedField.create(toFieldName(column.getPath()), type);
         // the field was not requested to be read
         if ( ! fieldSelected(field)) continue;
 
+        //convertedTypes.put()
         fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
-        ValueVector v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
         if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-          createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v);
+          createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+            convertedType);
         } else {
           if (column.getMaxDefinitionLevel() == 0){// column is required
-            varLengthColumns.add(new VarLenBinaryReader.VarLengthColumn(this, -1, column, columnChunkMetaData, false, v));
+            if (convertedType == ConvertedType.UTF8) {
+              varLengthColumns.add(
+                new VarLenBinaryReader.VarCharColumn(this, -1, column, columnChunkMetaData, false, (VarCharVector) v, convertedType));
+            } else {
+              varLengthColumns.add(
+                  new VarLenBinaryReader.VarBinaryColumn(this, -1, column, columnChunkMetaData, false, (VarBinaryVector) v, convertedType));
+            }
           }
           else{
-            nullableVarLengthColumns.add(new VarLenBinaryReader.NullableVarLengthColumn(this, -1, column, columnChunkMetaData, false, v));
+            if (convertedType == ConvertedType.UTF8) {
+              nullableVarLengthColumns.add(
+                new VarLenBinaryReader.NullableVarCharColumn(this, -1, column, columnChunkMetaData, false,
+                    (NullableVarCharVector) v, convertedType));
+            } else {
+              nullableVarLengthColumns.add(
+                new VarLenBinaryReader.NullableVarBinaryColumn(this, -1, column, columnChunkMetaData, false,
+                  (NullableVarBinaryVector) v, convertedType));
+            }
           }
         }
       }
@@ -259,15 +311,15 @@ class ParquetRecordReader implements RecordReader {
 
   private void resetBatch() {
     for (ColumnReader column : columnStatuses) {
-      column.valueVecHolder.reset();
+      AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5);
       column.valuesReadInCurrentPass = 0;
     }
     for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
-      r.valueVecHolder.reset();
+      AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
     for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){
-      r.valueVecHolder.reset();
+      AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5);
       r.valuesReadInCurrentPass = 0;
     }
   }
@@ -281,28 +333,29 @@ class ParquetRecordReader implements RecordReader {
    * @throws SchemaChangeException
    */
   private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor,
-                                          ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v)
+                                          ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+                                          ConvertedType convertedType)
       throws SchemaChangeException, ExecutionSetupException {
     // if the column is required
     if (descriptor.getMaxDefinitionLevel() == 0){
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
         columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v));
+            fixedLength, v, convertedType));
       }
       else{
         columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v));
+            fixedLength, v, convertedType));
       }
       return true;
     }
     else { // if the column is nullable
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
         columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v));
+            fixedLength, v, convertedType));
       }
       else{
         columnStatuses.add(new NullableFixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
-            fixedLength, v));
+            fixedLength, v, convertedType));
       }
       return true;
     }
@@ -363,18 +416,21 @@ class ParquetRecordReader implements RecordReader {
   }
 
   static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
-                                               TypeProtos.DataMode mode) {
-    return toMajorType(primitiveTypeName, 0, mode);
+                                               TypeProtos.DataMode mode, ConvertedType convertedType) {
+    return toMajorType(primitiveTypeName, 0, mode, convertedType);
   }
 
   static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
-                                               TypeProtos.DataMode mode) {
+                                               TypeProtos.DataMode mode, ConvertedType convertedType) {
     switch (mode) {
 
       case OPTIONAL:
         switch (primitiveTypeName) {
           case BINARY:
-            return Types.optional(TypeProtos.MinorType.VARBINARY);
+            if (convertedType == ConvertedType.UTF8)
+              return Types.optional(TypeProtos.MinorType.VARCHAR);
+            else
+              return Types.optional(TypeProtos.MinorType.VARBINARY);
           case INT64:
             return Types.optional(TypeProtos.MinorType.BIGINT);
           case INT32:
@@ -400,7 +456,10 @@ class ParquetRecordReader implements RecordReader {
       case REQUIRED:
         switch (primitiveTypeName) {
           case BINARY:
-            return Types.required(TypeProtos.MinorType.VARBINARY);
+            if (convertedType == ConvertedType.UTF8)
+              return Types.required(TypeProtos.MinorType.VARCHAR);
+            else
+              return Types.required(TypeProtos.MinorType.VARBINARY);
           case INT64:
             return Types.required(TypeProtos.MinorType.BIGINT);
           case INT32:
@@ -426,7 +485,10 @@ class ParquetRecordReader implements RecordReader {
       case REPEATED:
         switch (primitiveTypeName) {
           case BINARY:
-            return Types.repeated(TypeProtos.MinorType.VARBINARY);
+            if (convertedType == ConvertedType.UTF8)
+              return Types.required(TypeProtos.MinorType.VARCHAR);
+            else
+              return Types.repeated(TypeProtos.MinorType.VARBINARY);
           case INT64:
             return Types.repeated(TypeProtos.MinorType.BIGINT);
           case INT32:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 09d19a8..ae01104 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -18,11 +18,14 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.*;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import parquet.bytes.BytesUtils;
 import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 import java.io.IOException;
@@ -42,25 +45,156 @@ public class VarLenBinaryReader {
     this.columns = columns;
   }
 
-  public static class VarLengthColumn extends ColumnReader {
+  public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
 
-    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+                    ConvertedType convertedType) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
     }
 
     @Override
     protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
       throw new UnsupportedOperationException();
     }
+
+    public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
+
+    public abstract int capacity();
+
+  }
+
+  public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader {
+
+    int nullsRead;
+    boolean currentValNull = false;
+
+    NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+                            ConvertedType convertedType ) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+    }
+
+    public abstract boolean setSafe(int index, byte[] value, int start, int length);
+
+    public abstract int capacity();
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
+
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected VarCharVector varCharVector;
+
+    VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                    ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v,
+                    ConvertedType convertedType) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+      varCharVector = v;
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      return varCharVector.getMutator().setSafe(valuesReadInCurrentPass, bytes,
+          (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits);
+    }
+
+    @Override
+    public int capacity() {
+      return varCharVector.getData().capacity();
+    }
   }
 
-  public static class NullableVarLengthColumn extends ColumnReader {
+  public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> {
 
     int nullsRead;
     boolean currentValNull = false;
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected NullableVarCharVector nullableVarCharVector;
+
+    NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                            ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v,
+                            ConvertedType convertedType ) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+      nullableVarCharVector = v;
+    }
 
-    NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException {
-      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+    public boolean setSafe(int index, byte[] value, int start, int length) {
+      return nullableVarCharVector.getMutator().setSafe(index, value,
+          start, length);
+    }
+
+    @Override
+    public int capacity() {
+      return nullableVarCharVector.getData().capacity();
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> {
+
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected VarBinaryVector varBinaryVector;
+
+    VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                  ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
+                  ConvertedType convertedType) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+      varBinaryVector = v;
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      return varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, bytes,
+          (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits);
+    }
+
+    @Override
+    public int capacity() {
+      return varBinaryVector.getData().capacity();
+    }
+  }
+
+  public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> {
+
+    int nullsRead;
+    boolean currentValNull = false;
+    // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
+    protected NullableVarBinaryVector nullableVarBinaryVector;
+
+    NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+                          ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v,
+                          ConvertedType convertedType ) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType);
+      nullableVarBinaryVector = v;
+    }
+
+    public boolean setSafe(int index, byte[] value, int start, int length) {
+      return nullableVarBinaryVector.getMutator().setSafe(index, value,
+          start, length);
+    }
+
+    @Override
+    public int capacity() {
+      return nullableVarBinaryVector.getData().capacity();
     }
 
     @Override
@@ -83,8 +217,6 @@ public class VarLenBinaryReader {
     int lengthVarFieldsInCurrentRecord;
     boolean rowGroupFinished = false;
     byte[] bytes;
-    VarBinaryVector currVec;
-    NullableVarBinaryVector currNullVec;
     // write the first 0 offset
     for (ColumnReader columnReader : columns) {
       columnReader.bytesReadInCurrentPass = 0;
@@ -98,8 +230,8 @@ public class VarLenBinaryReader {
     }
     outer: do {
       lengthVarFieldsInCurrentRecord = 0;
-      for (ColumnReader columnReader : columns) {
-        if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+      for (VarLengthColumn columnReader : columns) {
+        if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
           rowGroupFinished = true;
           break;
         }
@@ -118,7 +250,7 @@ public class VarLenBinaryReader {
             (int) columnReader.pageReadStatus.readPosInBytes);
         lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
 
-        if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > ((VarBinaryVector) columnReader.valueVecHolder.getValueVector()).getData().capacity()) {
+        if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
           break outer;
         }
 
@@ -126,7 +258,7 @@ public class VarLenBinaryReader {
       for (NullableVarLengthColumn columnReader : nullableColumns) {
         // check to make sure there is capacity for the next value (for nullables this is a check to see if there is
         // still space in the nullability recording vector)
-        if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+        if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
           rowGroupFinished = true;
           break;
         }
@@ -151,7 +283,7 @@ public class VarLenBinaryReader {
             (int) columnReader.pageReadStatus.readPosInBytes);
         lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
 
-        if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > ((NullableVarBinaryVector) columnReader.valueVecHolder.getValueVector()).getData().capacity()) {
+        if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
           break outer;
         }
       }
@@ -160,12 +292,11 @@ public class VarLenBinaryReader {
           > parentReader.getBatchSize()){
         break outer;
       }
-      for (ColumnReader columnReader : columns) {
+      for (VarLengthColumn columnReader : columns) {
         bytes = columnReader.pageReadStatus.pageDataByteArray;
-        currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector();
         // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
-        boolean success = currVec.getMutator().setSafe(columnReader.valuesReadInCurrentPass, bytes,
-                (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
+        boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
+            (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
         assert success;
         columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
         columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
@@ -174,11 +305,10 @@ public class VarLenBinaryReader {
       }
       for (NullableVarLengthColumn columnReader : nullableColumns) {
         bytes = columnReader.pageReadStatus.pageDataByteArray;
-        currNullVec = (NullableVarBinaryVector) columnReader.valueVecHolder.getValueVector();
         // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
         if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){
-          boolean success = currNullVec.getMutator().setSafe(columnReader.valuesReadInCurrentPass, bytes,
-                  (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
+          boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
+              (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
           assert success;
         }
         columnReader.currentValNull = false;
@@ -195,10 +325,10 @@ public class VarLenBinaryReader {
       recordsReadInCurrentPass++;
     } while (recordsReadInCurrentPass < recordsToReadInThisPass);
     for (VarLengthColumn columnReader : columns) {
-      columnReader.valueVecHolder.getValueVector().getMutator().setValueCount((int) recordsReadInCurrentPass);
+      columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
     }
     for (NullableVarLengthColumn columnReader : nullableColumns) {
-      columnReader.valueVecHolder.getValueVector().getMutator().setValueCount((int) recordsReadInCurrentPass);
+      columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
     }
     return recordsReadInCurrentPass;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 9ba94fa..67b5394 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -103,6 +103,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
   }
 
+  public String getPlanForFile(String pathFileName, String parquetFileName) throws IOException {
+    return Files.toString(FileUtils.getResourceAsFile(pathFileName), Charsets.UTF_8)
+        .replaceFirst("&REPLACED_IN_PARQUET_TEST&", parquetFileName);
+  }
+
   @Test
   public void testMultipleRowGroupsAndReads2() throws Exception {
     String readEntries;
@@ -273,15 +278,27 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
 
   @Ignore
   @Test
+  /**
+   * Tests the reading of nullable var length columns, runs the tests twice, once on a file that has
+   * a converted type of UTF-8 to make sure it can be read
+   */
   public void testNullableColumnsVarLen() throws Exception {
     HashMap<String, FieldInfo> fields = new HashMap<>();
     ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields);
     byte[] val = {'b'};
     byte[] val2 = {'b', '2'};
-    byte[] val3 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
-    Object[] boolVals = { val, val2, val3};
+    byte[] val3 = {'b', '3'};
+    byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'};
+    Object[] boolVals = { val, val2, val4};
     props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props));
+    //
     testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props);
+    fields.clear();
+    // pass strings instead of byte arrays
+    Object[] boolVals2 = { "b", "b2", "b3"};
+    props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props));
+    testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json",
+        "\"/tmp/varLen.parquet/a\"", "unused", 1, props);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 73af98c..257a49e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -78,7 +78,10 @@ public class ParquetResultListener implements UserResultsListener {
 
     T val = (T) valueVector.getAccessor().getObject(index);
     if (val instanceof byte[]) {
-      assertEquals(true, Arrays.equals((byte[]) value, (byte[]) val));
+      assert(Arrays.equals((byte[]) value, (byte[]) val));
+    }
+    else if (val instanceof String) {
+      assert(val.equals(value));
     } else {
       assertEquals(value, val);
     }
@@ -120,16 +123,7 @@ public class ParquetResultListener implements UserResultsListener {
       }
       for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
         if (ParquetRecordReaderTest.VERBOSE_DEBUG){
-          if (vv.getAccessor().getObject(j) instanceof byte[]){
-            System.out.print("[len:" + ((byte[]) vv.getAccessor().getObject(j)).length + " - (");
-            for (int k = 0; k <  ((byte[]) vv.getAccessor().getObject(j)).length; k++){
-              System.out.print((char)((byte[])vv.getAccessor().getObject(j))[k] + ",");
-            }
-            System.out.print(") ]");
-          }
-          else{
-            System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " ");
-          }
+          System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " ");
           System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
         }
         if (testValues){
@@ -161,20 +155,9 @@ public class ParquetResultListener implements UserResultsListener {
 
         for (VectorWrapper vw : batchLoader) {
           ValueVector v = vw.getValueVector();
-          if (v.getAccessor().getObject(i) instanceof byte[]){
-            System.out.print("[len:" + ((byte[]) v.getAccessor().getObject(i)).length + " - (");
-            for (int j = 0; j <  ((byte[]) v.getAccessor().getObject(i)).length; j++){
-              System.out.print(((byte[])v.getAccessor().getObject(i))[j] + ",");
-            }
-            System.out.print(") ]");
-          }
-          else{
-            System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " ");
-          }
+          System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " ");
         }
-        System.out.println(
-
-        );
+        System.out.println();
       }
     }
     batchCounter++;