You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:52:36 UTC

[51/61] [abbrv] git commit: DRILL-930: Support date read/write in parquet.

DRILL-930: Support date read/write in parquet.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1c6b984
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1c6b984
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1c6b984

Branch: refs/heads/master
Commit: f1c6b9849ebffff592f940e6c03e0e48b2bf7a85
Parents: 8404068
Author: Jason Altekruse <al...@gmail.com>
Authored: Sun Jun 8 16:50:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Jun 10 18:59:39 2014 -0700

----------------------------------------------------------------------
 .../templates/ParquetOutputRecordWriter.java    | 23 ++++++++++++++-
 .../codegen/templates/ParquetTypeHelper.java    |  2 +-
 .../store/parquet/FixedByteAlignedReader.java   | 22 ++++++++++++++
 .../store/parquet/NullableColumnReader.java     |  3 +-
 .../parquet/NullableFixedByteAlignedReader.java | 30 ++++++++++++++++++++
 .../exec/store/parquet/ParquetRecordReader.java |  8 ++++--
 .../physical/impl/writer/TestParquetWriter.java | 22 +++++++++++++-
 7 files changed, 103 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 5f75c1c..07bd449 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+import org.joda.time.DateTimeUtils;
 import parquet.io.api.Binary;
 
 import java.lang.Override;
@@ -38,6 +39,14 @@ import parquet.io.api.RecordConsumer;
 import parquet.schema.MessageType;
 import parquet.io.api.Binary;
 import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+
+
+import org.apache.drill.common.types.TypeProtos;
+
+import org.joda.time.DateTimeUtils;
 
 import java.io.IOException;
 import java.lang.UnsupportedOperationException;
@@ -57,6 +66,7 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
 
   private RecordConsumer consumer;
   private MessageType schema;
+  public static final long JULIAN_DAY_EPOC = DateTimeUtils.toJulianDayNumber(0);
 
   public void setUp(MessageType schema, RecordConsumer consumer) {
     this.schema = schema;
@@ -106,7 +116,6 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
         minor.class == "BigInt" ||
         minor.class == "Decimal18" ||
         minor.class == "TimeStamp" ||
-        minor.class == "Date" ||
         minor.class == "UInt8">
       <#if mode.prefix == "Repeated" >
               consumer.addLong(valueHolder.vector.getAccessor().get(i));
@@ -115,6 +124,15 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
     consumer.addLong(valueHolder.value);
     consumer.endField(schema.getFieldName(fieldId), fieldId);
       </#if>
+  <#elseif minor.class == "Date">
+    <#if mode.prefix == "Repeated" >
+      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.vector.getAccessor().get(i)) + JULIAN_DAY_EPOC));
+    <#else>
+      consumer.startField(schema.getFieldName(fieldId), fieldId);
+      // convert from internal Drill date format to Julian Day centered around Unix Epoc
+      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.value) + JULIAN_DAY_EPOC));
+      consumer.endField(schema.getFieldName(fieldId), fieldId);
+    </#if>
   <#elseif
         minor.class == "Float8">
       <#if mode.prefix == "Repeated" >
@@ -139,6 +157,9 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
       <#if mode.prefix == "Repeated" >
       <#else>
       consumer.startField(schema.getFieldName(fieldId), fieldId);
+      ${minor.class}Vector tempVec = new ${minor.class}Vector(MaterializedField.create("", TypeProtos.MajorType.getDefaultInstance()), new TopLevelAllocator());
+      tempVec.allocateNew(10);
+      tempVec.getMutator().setSafe(0, valueHolder);
       byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
               valueHolder.buffer, valueHolder.start, ${minor.class}Holder.nDecimalDigits, valueHolder.scale).unscaledValue().toByteArray();
       byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
index b268d33..15830f6 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
@@ -56,6 +56,7 @@ public class ParquetTypeHelper {
             minor.class == "Time" ||
             minor.class == "IntervalYear" ||
             minor.class == "Decimal9" ||
+            minor.class == "Date" ||
             minor.class == "UInt4">
                     typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT32);
     <#elseif
@@ -65,7 +66,6 @@ public class ParquetTypeHelper {
             minor.class == "BigInt" ||
             minor.class == "Decimal18" ||
             minor.class == "TimeStamp" ||
-            minor.class == "Date" ||
             minor.class == "UInt8">
                     typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT64);
     <#elseif

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
index 574b0cb..26e1f09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -19,11 +19,15 @@ package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.DateHolder;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.DateVector;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.joda.time.DateTimeUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.format.ConvertedType;
 import parquet.format.SchemaElement;
@@ -92,6 +96,24 @@ class FixedByteAlignedReader extends ColumnReader {
     abstract void addNext(int start, int index);
   }
 
+  public static class DateReader extends ConvertedReader {
+
+    DateVector dateVector;
+
+    DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                    boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      dateVector = (DateVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
+          NullableFixedByteAlignedReader.NullableDateReader.readIntLittleEndian(bytes, start)
+              - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+    }
+  }
+
   public static class Decimal28Reader extends ConvertedReader {
 
     Decimal28SparseVector decimal28Vector;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 88a382a..eeb0344 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -74,8 +74,7 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
         lastValueWasNull = true;
         nullsFound = 0;
         if (currentValueIndexInVector == recordsToReadInThisPass
-            || currentValueIndexInVector >= valueVec.getValueCapacity()
-            || pageReadStatus.readPosInBytes >= pageReadStatus.byteLength){
+            || currentValueIndexInVector >= valueVec.getValueCapacity()) {
           break;
         }
         while(currentValueIndexInVector < recordsToReadInThisPass

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
index d4416c8..17759d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
@@ -21,10 +21,14 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DecimalUtility;
 import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.NullableDateVector;
 import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
 import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import org.joda.time.DateTimeUtils;
+import parquet.bytes.BytesUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.format.ConvertedType;
 import parquet.format.SchemaElement;
@@ -86,6 +90,32 @@ class NullableFixedByteAlignedReader extends NullableColumnReader {
     abstract void addNext(int start, int index);
   }
 
+  public static class NullableDateReader extends NullableConvertedReader {
+
+    NullableDateVector dateVector;
+
+    NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+               boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+      dateVector = (NullableDateVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+    }
+
+    // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+    public static int readIntLittleEndian(byte[] in, int offset) {
+      int ch4 = in[offset] & 0xff;
+      int ch3 = in[offset + 1] & 0xff;
+      int ch2 = in[offset + 2] & 0xff;
+      int ch1 = in[offset + 3] & 0xff;
+      return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+    }
+
+  }
+
   public static class NullableDecimal28Reader extends NullableConvertedReader {
 
     NullableDecimal28SparseVector decimal28Vector;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4c5f4bb..70560e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -321,8 +321,9 @@ public class ParquetRecordReader implements RecordReader {
         } else if (length <= 16) {
           columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
         }
-      }
-      else{
+      } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+        columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
+      } else{
         if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
           columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData,
               fixedLength, v, schemaElement));
@@ -337,6 +338,8 @@ public class ParquetRecordReader implements RecordReader {
       if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
         columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
             fixedLength, v, schemaElement));
+      } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+        columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
       } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
         int length = schemaElement.type_length;
         if (length <= 12) {
@@ -411,6 +414,7 @@ public class ParquetRecordReader implements RecordReader {
     return toMajorType(primitiveTypeName, 0, mode, schemaElement);
   }
 
+  // TODO - move this into ParquetTypeHelper and use code generation to create the list
   static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
                                                TypeProtos.DataMode mode, SchemaElement schemaElement) {
     ConvertedType convertedType = schemaElement.getConverted_type();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 8afcd7e..9febc58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -83,6 +83,27 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   @Test
+  public void testTPCHReadWrite1_date_convertedType() throws Exception {
+    String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+        "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as COMMITDATE, cast(L_RECEIPTDATE as DATE) AS RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+    String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+        "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+    String inputTable = "cp.`tpch/lineitem.parquet`";
+    runTestAndValidate(selection, validationSelection, inputTable, "lineitem_parquet");
+  }
+
+  // TODO file a JIRA for running this query with the projected column names the same as the originals, it failed with a deadbuf
+  // on the client, it appeared that the projection was sending batches out with a record count but a deadbuf
+  /*
+  String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+      "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+  String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+      "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+      */
+  // this is rather odd, I can select the data out fo parquet and project it to cast the date fields
+  // this stores all of the data correctly, but when I got to read it out again with the query that created it (with redudant casts I beleive) it has
+  // everything but the cast date columns as nulls
+  @Test
   public void testTPCHReadWrite2() throws Exception {
     String inputTable = "cp.`tpch/customer.parquet`";
     runTestAndValidate("*", "*", inputTable, "customer_parquet");
@@ -152,7 +173,6 @@ public class TestParquetWriter extends BaseTestQuery {
 
 
   @Test
-  @Ignore //enable once Date is enabled
   public void testDate() throws Exception {
     String selection = "cast(hire_date as DATE) as hire_date";
     String validateSelection = "hire_date";