You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/23 20:13:41 UTC

spark git commit: [SPARK-14015][SQL] Support TimestampType in vectorized parquet reader

Repository: spark
Updated Branches:
  refs/heads/master 02d9c352c -> 0a64294fc


[SPARK-14015][SQL] Support TimestampType in vectorized parquet reader

## What changes were proposed in this pull request?

This PR adds support for TimestampType in the vectorized parquet reader

## How was this patch tested?

1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96)` that made us fall back on parquet-mr for handling timestamps. This condition is now removed.
2. The `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `TimestampType`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change. Similarly, the `ParquetHiveCompatibilitySuite.SPARK-10177 timestamp` test that fails when the gating condition is removed, should now pass as well.
3.  Added tests in `HadoopFsRelationTest` that test both the dictionary encoded and non-encoded versions across all supported datatypes.

Author: Sameer Agarwal <sa...@databricks.com>

Closes #11882 from sameeragarwal/timestamp-parquet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a64294f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a64294f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a64294f

Branch: refs/heads/master
Commit: 0a64294fcb4b64bfe095c63c3a494e0f40e22743
Parents: 02d9c35
Author: Sameer Agarwal <sa...@databricks.com>
Authored: Wed Mar 23 12:13:32 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 23 12:13:32 2016 -0700

----------------------------------------------------------------------
 .../parquet/VectorizedColumnReader.java         | 29 ++++++-
 .../parquet/VectorizedParquetRecordReader.java  | 13 ----
 .../vectorized/OffHeapColumnVector.java         |  2 +-
 .../vectorized/OnHeapColumnVector.java          |  3 +-
 .../parquet/CatalystRowConverter.scala          | 10 +++
 .../sql/sources/hadoopFsRelationSuites.scala    | 82 +++++++++++---------
 6 files changed, 86 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a64294f/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 2c23ccc..6cc2fda 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -213,6 +213,9 @@ public class VectorizedColumnReader {
           case INT64:
             readLongBatch(rowId, num, column);
             break;
+          case INT96:
+            readBinaryBatch(rowId, num, column);
+            break;
           case FLOAT:
             readFloatBatch(rowId, num, column);
             break;
@@ -249,7 +252,17 @@ public class VectorizedColumnReader {
       case BINARY:
         column.setDictionary(dictionary);
         break;
-
+      case INT96:
+        if (column.dataType() == DataTypes.TimestampType) {
+          for (int i = rowId; i < rowId + num; ++i) {
+            // TODO: Convert dictionary of Binaries to dictionary of Longs
+            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+            column.putLong(i, CatalystRowConverter.binaryToSQLTimestamp(v));
+          }
+        } else {
+          throw new NotImplementedException();
+        }
+        break;
       case FIXED_LEN_BYTE_ARRAY:
         // DecimalType written in the legacy mode
         if (DecimalType.is32BitDecimalType(column.dataType())) {
@@ -342,9 +355,19 @@ public class VectorizedColumnReader {
   private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
+    VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
     if (column.isArray()) {
-      defColumn.readBinarys(
-          num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+      defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
+    } else if (column.dataType() == DataTypes.TimestampType) {
+      for (int i = 0; i < num; i++) {
+        if (defColumn.readInteger() == maxDefLevel) {
+          column.putLong(rowId + i,
+              // Read 12 bytes for INT96
+              CatalystRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
+        } else {
+          column.putNull(rowId + i);
+        }
+      }
     } else {
       throw new NotImplementedException("Unimplemented type: " + column.dataType());
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a64294f/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 9ac2513..ab09208 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -252,26 +252,13 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     /**
      * Check that the requested schema is supported.
      */
-    OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
     missingColumns = new boolean[requestedSchema.getFieldCount()];
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
       Type t = requestedSchema.getFields().get(i);
       if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
         throw new IOException("Complex types not supported.");
       }
-      PrimitiveType primitiveType = t.asPrimitiveType();
 
-      originalTypes[i] = t.getOriginalType();
-
-      // TODO: Be extremely cautious in what is supported. Expand this.
-      if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL &&
-          originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE &&
-          originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) {
-        throw new IOException("Unsupported type: " + t);
-      }
-      if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
-        throw new IOException("Int96 not supported.");
-      }
       String[] colPath = requestedSchema.getPaths().get(i);
       if (fileSchema.containsPath(colPath)) {
         ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);

http://git-wip-us.apache.org/repos/asf/spark/blob/0a64294f/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 689e6a2..b190141 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -407,7 +407,7 @@ public final class OffHeapColumnVector extends ColumnVector {
         type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
       this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
     } else if (type instanceof LongType || type instanceof DoubleType ||
-        DecimalType.is64BitDecimalType(type)) {
+        DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
       this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
     } else if (resultStruct != null) {
       // Nothing to store.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a64294f/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index f332e87..b1429fe 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -403,7 +403,8 @@ public final class OnHeapColumnVector extends ColumnVector {
       int[] newData = new int[newCapacity];
       if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
       intData = newData;
-    } else if (type instanceof LongType || DecimalType.is64BitDecimalType(type)) {
+    } else if (type instanceof LongType || type instanceof TimestampType ||
+        DecimalType.is64BitDecimalType(type)) {
       long[] newData = new long[newCapacity];
       if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
       longData = newData;

http://git-wip-us.apache.org/repos/asf/spark/blob/0a64294f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index de6dd0f..6bf82be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -659,4 +660,13 @@ private[parquet] object CatalystRowConverter {
     unscaled = (unscaled << (64 - bits)) >> (64 - bits)
     unscaled
   }
+
+  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
+    assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
+      s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
+    val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
+    val timeOfDayNanos = buffer.getLong
+    val julianDay = buffer.getInt
+    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a64294f/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 7e5506e..e842caf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -116,44 +116,56 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     new MyDenseVectorUDT()
   ).filter(supportsDataType)
 
-  for (dataType <- supportedDataTypes) {
-    test(s"test all data types - $dataType") {
-      withTempPath { file =>
-        val path = file.getCanonicalPath
-
-        val dataGenerator = RandomDataGenerator.forType(
-          dataType = dataType,
-          nullable = true,
-          new Random(System.nanoTime())
-        ).getOrElse {
-          fail(s"Failed to create data generator for schema $dataType")
+  try {
+    for (dataType <- supportedDataTypes) {
+      for (parquetDictionaryEncodingEnabled <- Seq(true, false)) {
+        test(s"test all data types - $dataType with parquet.enable.dictionary = " +
+          s"$parquetDictionaryEncodingEnabled") {
+
+          hadoopConfiguration.setBoolean("parquet.enable.dictionary",
+            parquetDictionaryEncodingEnabled)
+
+          withTempPath { file =>
+            val path = file.getCanonicalPath
+
+            val dataGenerator = RandomDataGenerator.forType(
+              dataType = dataType,
+              nullable = true,
+              new Random(System.nanoTime())
+            ).getOrElse {
+              fail(s"Failed to create data generator for schema $dataType")
+            }
+
+            // Create a DF for the schema with random data. The index field is used to sort the
+            // DataFrame.  This is a workaround for SPARK-10591.
+            val schema = new StructType()
+              .add("index", IntegerType, nullable = false)
+              .add("col", dataType, nullable = true)
+            val rdd =
+              sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+            val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+            df.write
+              .mode("overwrite")
+              .format(dataSourceName)
+              .option("dataSchema", df.schema.json)
+              .save(path)
+
+            val loadedDF = sqlContext
+              .read
+              .format(dataSourceName)
+              .option("dataSchema", df.schema.json)
+              .schema(df.schema)
+              .load(path)
+              .orderBy("index")
+
+            checkAnswer(loadedDF, df)
+          }
         }
-
-        // Create a DF for the schema with random data. The index field is used to sort the
-        // DataFrame.  This is a workaround for SPARK-10591.
-        val schema = new StructType()
-          .add("index", IntegerType, nullable = false)
-          .add("col", dataType, nullable = true)
-        val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
-        val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
-        df.write
-          .mode("overwrite")
-          .format(dataSourceName)
-          .option("dataSchema", df.schema.json)
-          .save(path)
-
-        val loadedDF = sqlContext
-          .read
-          .format(dataSourceName)
-          .option("dataSchema", df.schema.json)
-          .schema(df.schema)
-          .load(path)
-          .orderBy("index")
-
-        checkAnswer(loadedDF, df)
       }
     }
+  } finally {
+    hadoopConfiguration.unset("parquet.enable.dictionary")
   }
 
   test("save()/load() - non-partitioned table - Overwrite") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org