You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:52:36 UTC
[51/61] [abbrv] git commit: DRILL-930: Support date read/write in
parquet.
DRILL-930: Support date read/write in parquet.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f1c6b984
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f1c6b984
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f1c6b984
Branch: refs/heads/master
Commit: f1c6b9849ebffff592f940e6c03e0e48b2bf7a85
Parents: 8404068
Author: Jason Altekruse <al...@gmail.com>
Authored: Sun Jun 8 16:50:55 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Jun 10 18:59:39 2014 -0700
----------------------------------------------------------------------
.../templates/ParquetOutputRecordWriter.java | 23 ++++++++++++++-
.../codegen/templates/ParquetTypeHelper.java | 2 +-
.../store/parquet/FixedByteAlignedReader.java | 22 ++++++++++++++
.../store/parquet/NullableColumnReader.java | 3 +-
.../parquet/NullableFixedByteAlignedReader.java | 30 ++++++++++++++++++++
.../exec/store/parquet/ParquetRecordReader.java | 8 ++++--
.../physical/impl/writer/TestParquetWriter.java | 22 +++++++++++++-
7 files changed, 103 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 5f75c1c..07bd449 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+import org.joda.time.DateTimeUtils;
import parquet.io.api.Binary;
import java.lang.Override;
@@ -38,6 +39,14 @@ import parquet.io.api.RecordConsumer;
import parquet.schema.MessageType;
import parquet.io.api.Binary;
import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+
+
+import org.apache.drill.common.types.TypeProtos;
+
+import org.joda.time.DateTimeUtils;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
@@ -57,6 +66,7 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
private RecordConsumer consumer;
private MessageType schema;
+ public static final long JULIAN_DAY_EPOC = DateTimeUtils.toJulianDayNumber(0);
public void setUp(MessageType schema, RecordConsumer consumer) {
this.schema = schema;
@@ -106,7 +116,6 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
minor.class == "BigInt" ||
minor.class == "Decimal18" ||
minor.class == "TimeStamp" ||
- minor.class == "Date" ||
minor.class == "UInt8">
<#if mode.prefix == "Repeated" >
consumer.addLong(valueHolder.vector.getAccessor().get(i));
@@ -115,6 +124,15 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
consumer.addLong(valueHolder.value);
consumer.endField(schema.getFieldName(fieldId), fieldId);
</#if>
+ <#elseif minor.class == "Date">
+ <#if mode.prefix == "Repeated" >
+ consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.vector.getAccessor().get(i)) + JULIAN_DAY_EPOC));
+ <#else>
+ consumer.startField(schema.getFieldName(fieldId), fieldId);
+ // convert from internal Drill date format to Julian Day centered around Unix Epoc
+ consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.value) + JULIAN_DAY_EPOC));
+ consumer.endField(schema.getFieldName(fieldId), fieldId);
+ </#if>
<#elseif
minor.class == "Float8">
<#if mode.prefix == "Repeated" >
@@ -139,6 +157,9 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
<#if mode.prefix == "Repeated" >
<#else>
consumer.startField(schema.getFieldName(fieldId), fieldId);
+ ${minor.class}Vector tempVec = new ${minor.class}Vector(MaterializedField.create("", TypeProtos.MajorType.getDefaultInstance()), new TopLevelAllocator());
+ tempVec.allocateNew(10);
+ tempVec.getMutator().setSafe(0, valueHolder);
byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
valueHolder.buffer, valueHolder.start, ${minor.class}Holder.nDecimalDigits, valueHolder.scale).unscaledValue().toByteArray();
byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
index b268d33..15830f6 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetTypeHelper.java
@@ -56,6 +56,7 @@ public class ParquetTypeHelper {
minor.class == "Time" ||
minor.class == "IntervalYear" ||
minor.class == "Decimal9" ||
+ minor.class == "Date" ||
minor.class == "UInt4">
typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT32);
<#elseif
@@ -65,7 +66,6 @@ public class ParquetTypeHelper {
minor.class == "BigInt" ||
minor.class == "Decimal18" ||
minor.class == "TimeStamp" ||
- minor.class == "Date" ||
minor.class == "UInt8">
typeMap.put(MinorType.${minor.class?upper_case}, PrimitiveTypeName.INT64);
<#elseif
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
index 574b0cb..26e1f09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -19,11 +19,15 @@ package org.apache.drill.exec.store.parquet;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.DateHolder;
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.DateVector;
import org.apache.drill.exec.vector.Decimal28SparseVector;
import org.apache.drill.exec.vector.Decimal38SparseVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.joda.time.DateTimeUtils;
import parquet.column.ColumnDescriptor;
import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
@@ -92,6 +96,24 @@ class FixedByteAlignedReader extends ColumnReader {
abstract void addNext(int start, int index);
}
+ public static class DateReader extends ConvertedReader {
+
+ DateVector dateVector;
+
+ DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ dateVector = (DateVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
+ NullableFixedByteAlignedReader.NullableDateReader.readIntLittleEndian(bytes, start)
+ - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ }
+ }
+
public static class Decimal28Reader extends ConvertedReader {
Decimal28SparseVector decimal28Vector;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 88a382a..eeb0344 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -74,8 +74,7 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
lastValueWasNull = true;
nullsFound = 0;
if (currentValueIndexInVector == recordsToReadInThisPass
- || currentValueIndexInVector >= valueVec.getValueCapacity()
- || pageReadStatus.readPosInBytes >= pageReadStatus.byteLength){
+ || currentValueIndexInVector >= valueVec.getValueCapacity()) {
break;
}
while(currentValueIndexInVector < recordsToReadInThisPass
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
index d4416c8..17759d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
@@ -21,10 +21,14 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DecimalUtility;
import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.NullableDateVector;
import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.joda.time.DateTimeUtils;
+import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
@@ -86,6 +90,32 @@ class NullableFixedByteAlignedReader extends NullableColumnReader {
abstract void addNext(int start, int index);
}
+ public static class NullableDateReader extends NullableConvertedReader {
+
+ NullableDateVector dateVector;
+
+ NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ dateVector = (NullableDateVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ }
+
+ // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+ public static int readIntLittleEndian(byte[] in, int offset) {
+ int ch4 = in[offset] & 0xff;
+ int ch3 = in[offset + 1] & 0xff;
+ int ch2 = in[offset + 2] & 0xff;
+ int ch1 = in[offset + 3] & 0xff;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ }
+
public static class NullableDecimal28Reader extends NullableConvertedReader {
NullableDecimal28SparseVector decimal28Vector;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4c5f4bb..70560e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -321,8 +321,9 @@ public class ParquetRecordReader implements RecordReader {
} else if (length <= 16) {
columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
}
- }
- else{
+ } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+ columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
+ } else{
if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement));
@@ -337,6 +338,8 @@ public class ParquetRecordReader implements RecordReader {
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement));
+ } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+ columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
} else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
int length = schemaElement.type_length;
if (length <= 12) {
@@ -411,6 +414,7 @@ public class ParquetRecordReader implements RecordReader {
return toMajorType(primitiveTypeName, 0, mode, schemaElement);
}
+ // TODO - move this into ParquetTypeHelper and use code generation to create the list
static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
TypeProtos.DataMode mode, SchemaElement schemaElement) {
ConvertedType convertedType = schemaElement.getConverted_type();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f1c6b984/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 8afcd7e..9febc58 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -83,6 +83,27 @@ public class TestParquetWriter extends BaseTestQuery {
}
@Test
+ public void testTPCHReadWrite1_date_convertedType() throws Exception {
+ String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+ "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as COMMITDATE, cast(L_RECEIPTDATE as DATE) AS RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+ String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+ "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+ String inputTable = "cp.`tpch/lineitem.parquet`";
+ runTestAndValidate(selection, validationSelection, inputTable, "lineitem_parquet");
+ }
+
+ // TODO file a JIRA for running this query with the projected column names the same as the originals, it failed with a deadbuf
+ // on the client, it appeared that the projection was sending batches out with a record count but a deadbuf
+ /*
+ String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+ "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+ String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " +
+ "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE,COMMITDATE ,RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT";
+ */
+ // this is rather odd, I can select the data out fo parquet and project it to cast the date fields
+ // this stores all of the data correctly, but when I got to read it out again with the query that created it (with redudant casts I beleive) it has
+ // everything but the cast date columns as nulls
+ @Test
public void testTPCHReadWrite2() throws Exception {
String inputTable = "cp.`tpch/customer.parquet`";
runTestAndValidate("*", "*", inputTable, "customer_parquet");
@@ -152,7 +173,6 @@ public class TestParquetWriter extends BaseTestQuery {
@Test
- @Ignore //enable once Date is enabled
public void testDate() throws Exception {
String selection = "cast(hire_date as DATE) as hire_date";
String validateSelection = "hire_date";