You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/05/12 18:08:28 UTC

drill git commit: DRILL-1980: Add support for CTAS with interval data type. Modify ParquetOutputRecordWriter to be able to write interval day, interval year and interval data type into parquet. Modify the two parquet readers to be able to read interval d

Repository: drill
Updated Branches:
  refs/heads/master 71fe8f02f -> d10769f47


DRILL-1980: Add support for CTAS with interval data type.
Modify ParquetOutputRecordWriter to be able to write interval day, interval year and interval data type into parquet.
Modify the two parquet readers to be able to read interval data type from parquet file

Add unit tests


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d10769f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d10769f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d10769f4

Branch: refs/heads/master
Commit: d10769f478900ff1868d206086874bdd67a45e7d
Parents: 71fe8f0
Author: Mehant Baid <me...@gmail.com>
Authored: Mon May 11 16:31:42 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Tue May 12 07:46:48 2015 -0700

----------------------------------------------------------------------
 .../templates/ParquetOutputRecordWriter.java    | 37 ++++++--
 .../codegen/templates/ParquetTypeHelper.java    | 10 ++-
 .../store/parquet/ParquetReaderUtility.java     | 10 +++
 .../columnreaders/ColumnReaderFactory.java      | 22 +++--
 .../columnreaders/FixedByteAlignedReader.java   | 26 ++++++
 .../NullableFixedByteAlignedReaders.java        | 26 ++++++
 .../ParquetToDrillTypeConverter.java            |  2 +
 .../parquet2/DrillParquetGroupConverter.java    | 30 ++++++-
 .../physical/impl/writer/TestParquetWriter.java | 92 ++++++++++++++++++++
 9 files changed, 240 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 0d24041..35777b0 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -21,6 +21,7 @@ import parquet.io.api.Binary;
 
 import java.lang.Override;
 import java.lang.RuntimeException;
+import java.util.Arrays;
 
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="org/apache/drill/exec/store/ParquetOutputRecordWriter.java" />
@@ -86,6 +87,9 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
 
   public class ${mode.prefix}${minor.class}ParquetConverter extends FieldConverter {
     private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
+    <#if minor.class?contains("Interval")>
+      private final byte[] output = new byte[12];
+    </#if>
 
     public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) {
       super(fieldId, fieldName, reader);
@@ -112,7 +116,6 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
         minor.class == "SmallInt" ||
         minor.class == "Int" ||
         minor.class == "Time" ||
-        minor.class == "IntervalYear" ||
         minor.class == "Decimal9" ||
         minor.class == "UInt4">
     <#if mode.prefix == "Repeated" >
@@ -201,13 +204,29 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
       consumer.addBinary(Binary.fromByteArray(output));
       consumer.endField(fieldName, fieldId);
       </#if>
+  <#elseif minor.class?contains("Interval")>
+      consumer.startField(fieldName, fieldId);
+      reader.read(holder);
+      <#if minor.class == "IntervalDay">
+        Arrays.fill(output, 0, 4, (byte) 0);
+        IntervalUtility.intToLEByteArray(holder.days, output, 4);
+        IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
+      <#elseif minor.class == "IntervalYear">
+        IntervalUtility.intToLEByteArray(holder.value, output, 0);
+        Arrays.fill(output, 4, 8, (byte) 0);
+        Arrays.fill(output, 8, 12, (byte) 0);
+      <#elseif minor.class == "Interval">
+        IntervalUtility.intToLEByteArray(holder.months, output, 0);
+        IntervalUtility.intToLEByteArray(holder.days, output, 4);
+        IntervalUtility.intToLEByteArray(holder.milliseconds, output, 8);
+      </#if>
+      consumer.addBinary(Binary.fromByteArray(output));
+      consumer.endField(fieldName, fieldId);
+
   <#elseif
         minor.class == "TimeTZ" ||
-        minor.class == "IntervalDay" ||
-        minor.class == "Interval" ||
         minor.class == "Decimal28Dense" ||
         minor.class == "Decimal38Dense">
-
       <#if mode.prefix == "Repeated" >
       <#else>
 
@@ -235,5 +254,13 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
     </#list>
   </#list>
 </#list>
-
+  private static class IntervalUtility {
+    private static void intToLEByteArray(final int value, final byte[] output, final int outputIndex) {
+      int shiftOrder = 0;
+      for (int i = outputIndex; i < outputIndex + 4; i++) {
+        output[i] = (byte) (value >> shiftOrder);
+        shiftOrder += 8;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
index 6ac488d..47881bc 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+import org.apache.drill.common.types.MinorType;
 import parquet.format.ConvertedType;
 import parquet.schema.DecimalMetadata;
 import parquet.schema.OriginalType;
@@ -54,7 +55,6 @@ public class ParquetTypeHelper {
             minor.class == "SmallInt" ||
             minor.class == "Int" ||
             minor.class == "Time" ||
-            minor.class == "IntervalYear" ||
             minor.class == "Decimal9" ||
             minor.class == "Date" ||
             minor.class == "UInt4">
@@ -77,6 +77,7 @@ public class ParquetTypeHelper {
     <#elseif
             minor.class == "TimeTZ" ||
             minor.class == "IntervalDay" ||
+            minor.class == "IntervalYear" ||
             minor.class == "Interval" ||
             minor.class == "Decimal28Dense" ||
             minor.class == "Decimal38Dense" ||
@@ -111,6 +112,9 @@ public class ParquetTypeHelper {
             originalTypeMap.put(MinorType.DATE, OriginalType.DATE);
             originalTypeMap.put(MinorType.TIME, OriginalType.TIME_MILLIS);
             originalTypeMap.put(MinorType.TIMESTAMP, OriginalType.TIMESTAMP_MILLIS);
+            originalTypeMap.put(MinorType.INTERVALDAY, OriginalType.INTERVAL);
+            originalTypeMap.put(MinorType.INTERVALYEAR, OriginalType.INTERVAL);
+            originalTypeMap.put(MinorType.INTERVAL, OriginalType.INTERVAL);
 //            originalTypeMap.put(MinorType.TIMESTAMPTZ, OriginalType.TIMESTAMPTZ);
   }
 
@@ -142,6 +146,10 @@ public class ParquetTypeHelper {
 
   public static int getLengthForMinorType(MinorType minorType) {
     switch(minorType) {
+      case INTERVALDAY:
+      case INTERVALYEAR:
+      case INTERVAL:
+        return 12;
       case DECIMAL28SPARSE:
         return 12;
       case DECIMAL38SPARSE:

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 5291855..da480d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -34,4 +34,14 @@ public class ParquetReaderUtility {
           .build();
     }
   }
+
+  public static int getIntFromLEBytes(byte[] input, int start) {
+    int out = 0;
+    int shiftOrder = 0;
+    for (int i = start; i < start + 4; i++) {
+      out |= (((input[i]) & 0xFF) << shiftOrder);
+      shiftOrder += 8;
+    }
+    return out;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 70b2342..650163e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -82,7 +82,10 @@ public class ColumnReaderFactory {
           } else if (length <= 16) {
             return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
           }
-        } else {
+        } else if (convertedType == ConvertedType.INTERVAL) {
+          return new FixedByteAlignedReader.IntervalReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+        }
+        else {
           return new FixedByteAlignedReader.FixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, (VariableWidthVector) v, schemaElement);
         }
       } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
@@ -134,12 +137,17 @@ public class ColumnReaderFactory {
             fixedLength, v, schemaElement);
       } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
         return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
-        int length = schemaElement.type_length;
-        if (length <= 12) {
-          return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
-        } else if (length <= 16) {
-          return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+        if (convertedType == ConvertedType.DECIMAL) {
+          int length = schemaElement.type_length;
+          if (length <= 12) {
+            return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+          } else if (length <= 16) {
+            return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+          }
+        } else if (convertedType == ConvertedType.INTERVAL) {
+          return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, allocateSize, descriptor,
+              columnChunkMetaData, fixedLength, v, schemaElement);
         }
       } else {
         return getNullableColumnReader(recordReader, allocateSize, descriptor,

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index fe0234b..07b78e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -24,11 +24,14 @@ import java.math.BigDecimal;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.DateVector;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.IntervalVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.joda.time.DateTimeUtils;
@@ -36,6 +39,7 @@ import org.joda.time.DateTimeUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
 
 class FixedByteAlignedReader extends ColumnReader {
 
@@ -179,4 +183,26 @@ class FixedByteAlignedReader extends ColumnReader {
               schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
     }
   }
+
+  public static class IntervalReader extends ConvertedReader {
+    IntervalVector intervalVector;
+
+    IntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                   boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      intervalVector = (IntervalVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      if (usingDictionary) {
+        byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
+        intervalVector.getMutator().setSafe(index * 12,
+            ParquetReaderUtility.getIntFromLEBytes(input, 0),
+            ParquetReaderUtility.getIntFromLEBytes(input, 4),
+            ParquetReaderUtility.getIntFromLEBytes(input, 8));
+      }
+      intervalVector.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index c2221d6..e2388ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -25,7 +25,9 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.IntervalVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableDateVector;
 import org.apache.drill.exec.vector.NullableDecimal18Vector;
@@ -35,6 +37,7 @@ import org.apache.drill.exec.vector.NullableDecimal9Vector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableIntervalVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.NullableTimeVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -43,6 +46,7 @@ import org.joda.time.DateTimeUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
 
 public class NullableFixedByteAlignedReaders {
 
@@ -333,4 +337,26 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  public static class NullableIntervalReader extends NullableConvertedReader {
+    NullableIntervalVector nullableIntervalVector;
+
+    NullableIntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                   boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      nullableIntervalVector = (NullableIntervalVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      if (usingDictionary) {
+        byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes();
+        nullableIntervalVector.getMutator().setSafe(index * 12, 1,
+            ParquetReaderUtility.getIntFromLEBytes(input, 0),
+            ParquetReaderUtility.getIntFromLEBytes(input, 4),
+            ParquetReaderUtility.getIntFromLEBytes(input, 8));
+      }
+      nullableIntervalVector.getMutator().set(index, 1, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index 8ab5fea..a120f57 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -106,6 +106,8 @@ public class ParquetToDrillTypeConverter {
         } else if (convertedType == ConvertedType.DECIMAL) {
           ParquetReaderUtility.checkDecimalTypeEnabled(options);
           return getDecimalType(schemaElement);
+        } else if (convertedType == ConvertedType.INTERVAL) {
+          return TypeProtos.MinorType.INTERVAL;
         }
       default:
         throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index c6367ae..825dc54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.expr.holders.Decimal9Holder;
 import org.apache.drill.exec.expr.holders.Float4Holder;
 import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
 import org.apache.drill.exec.expr.holders.TimeHolder;
 import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.expr.holders.VarBinaryHolder;
@@ -60,6 +61,7 @@ import org.apache.drill.exec.vector.complex.writer.Decimal9Writer;
 import org.apache.drill.exec.vector.complex.writer.Float4Writer;
 import org.apache.drill.exec.vector.complex.writer.Float8Writer;
 import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.IntervalWriter;
 import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
 import org.apache.drill.exec.vector.complex.writer.TimeWriter;
 import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
@@ -254,7 +256,13 @@ public class DrillParquetGroupConverter extends GroupConverter {
             Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision());
             return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer());
           }
-        } else {
+        } else if (type.getOriginalType() == OriginalType.INTERVAL) {
+          IntervalWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).interval()
+              : mapWriter.interval(name);
+          return new DrillFixedLengthByteArrayToInterval(writer);
+
+        }
+        else {
           throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType());
         }
       default:
@@ -509,10 +517,28 @@ public class DrillParquetGroupConverter extends GroupConverter {
 
     @Override
     public void addBinary(Binary value) {
-      BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale);
+       BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale);
       DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal38SparseHolder.nDecimalDigits);
       holder.buffer = buf;
       writer.write(holder);
     }
   }
+
+  public static class DrillFixedLengthByteArrayToInterval extends PrimitiveConverter {
+    final private IntervalWriter writer;
+    final private IntervalHolder holder = new IntervalHolder();
+
+    public DrillFixedLengthByteArrayToInterval(IntervalWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      final byte[] input = value.getBytes();
+      holder.months = ParquetReaderUtility.getIntFromLEBytes(input, 0);
+      holder.days = ParquetReaderUtility.getIntFromLEBytes(input, 4);
+      holder.milliseconds = ParquetReaderUtility.getIntFromLEBytes(input, 8);
+      writer.write(holder);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d10769f4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 5f57567..5dc7bec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
+import org.joda.time.Period;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -477,6 +478,97 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
+  /*
+ * Method tests CTAS with interval data type. We also verify reading back the data to ensure we
+ * have written the correct type. For every CTAS operation we use both the readers to verify results.
+ */
+  @Test
+  public void testCTASWithIntervalTypes() throws Exception {
+    test("use dfs_test.tmp");
+
+    String tableName = "drill_1980_t1";
+    // test required interval day type
+    test(String.format("create table %s as " +
+        "select " +
+        "interval '10 20:30:40.123' day to second col1, " +
+        "interval '-1000000000 20:12:23.999' day(10) to second col2 " +
+        "from cp.`employee.json` limit 2", tableName));
+
+    Period row1Col1 = new Period(0, 0, 0, 10, 0, 0, 0, 73840123);
+    Period row1Col2 = new Period(0, 0, 0, -1000000000, 0, 0, 0, -72743999);
+    testParquetReaderHelper(tableName, row1Col1, row1Col2, row1Col1, row1Col2);
+
+    tableName = "drill_1980_2";
+
+    // test required interval year type
+    test(String.format("create table %s as " +
+        "select " +
+        "interval '10-2' year to month col1, " +
+        "interval '-100-8' year(3) to month col2 " +
+        "from cp.`employee.json` limit 2", tableName));
+
+    row1Col1 = new Period(0, 122, 0, 0, 0, 0, 0, 0);
+    row1Col2 = new Period(0, -1208, 0, 0, 0, 0, 0, 0);
+
+    testParquetReaderHelper(tableName, row1Col1, row1Col2, row1Col1, row1Col2);
+    // test nullable interval year type
+    tableName = "drill_1980_t3";
+    test(String.format("create table %s as " +
+        "select " +
+        "cast (intervalyear_col as interval year) col1," +
+        "cast(intervalyear_col as interval year) + interval '2' year col2 " +
+        "from cp.`parquet/alltypes.json` where tinyint_col = 1 or tinyint_col = 2", tableName));
+
+    row1Col1 = new Period(0, 12, 0, 0, 0, 0, 0, 0);
+    row1Col2 = new Period(0, 36, 0, 0, 0, 0, 0, 0);
+    Period row2Col1 = new Period(0, 24, 0, 0, 0, 0, 0, 0);
+    Period row2Col2 = new Period(0, 48, 0, 0, 0, 0, 0, 0);
+
+    testParquetReaderHelper(tableName, row1Col1, row1Col2, row2Col1, row2Col2);
+
+    // test nullable interval day type
+    tableName = "drill_1980_t4";
+    test(String.format("create table %s as " +
+        "select " +
+        "cast(intervalday_col as interval day) col1, " +
+        "cast(intervalday_col as interval day) + interval '1' day col2 " +
+        "from cp.`parquet/alltypes.json` where tinyint_col = 1 or tinyint_col = 2", tableName));
+
+    row1Col1 = new Period(0, 0, 0, 1, 0, 0, 0, 0);
+    row1Col2 = new Period(0, 0, 0, 2, 0, 0, 0, 0);
+    row2Col1 = new Period(0, 0, 0, 2, 0, 0, 0, 0);
+    row2Col2 = new Period(0, 0, 0, 3, 0, 0, 0, 0);
+
+    testParquetReaderHelper(tableName, row1Col1, row1Col2, row2Col1, row2Col2);
+  }
+
+  private void testParquetReaderHelper(String tableName, Period row1Col1, Period row1Col2,
+                                       Period row2Col1, Period row2Col2) throws Exception {
+
+    final String switchReader = "alter session set `store.parquet.use_new_reader` = %s; ";
+    final String enableVectorizedReader = String.format(switchReader, true);
+    final String disableVectorizedReader = String.format(switchReader, false);
+    String query = String.format("select * from %s", tableName);
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .optionSettingQueriesForTestQuery(enableVectorizedReader)
+        .baselineColumns("col1", "col2")
+        .baselineValues(row1Col1, row1Col2)
+        .baselineValues(row2Col1, row2Col2)
+        .go();
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .optionSettingQueriesForTestQuery(disableVectorizedReader)
+        .baselineColumns("col1", "col2")
+        .baselineValues(row1Col1, row1Col2)
+        .baselineValues(row2Col1, row2Col2)
+        .go();
+  }
+
   private static void deleteTableIfExists(String tableName) {
     try {
       Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);