You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/09/09 21:23:14 UTC

spark git commit: [SPARK-17354] [SQL] Partitioning by dates/timestamps should work with Parquet vectorized reader

Repository: spark
Updated Branches:
  refs/heads/master a3981c28c -> f7d214370


[SPARK-17354] [SQL] Partitioning by dates/timestamps should work with Parquet vectorized reader

## What changes were proposed in this pull request?

This PR fixes `ColumnVectorUtils.populate` so that Parquet vectorized reader can read partitioned table with dates/timestamps. This works fine with Parquet normal reader.

This is being only called within [VectorizedParquetRecordReader.java#L185](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L185).

When partition column types are explicitly given to `DateType` or `TimestampType` (rather than inferring the type of partition column), this fails with the exception below:

```
16/09/01 10:30:07 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 6)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.sql.Date
	at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:89)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:185)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:204)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:362)
...
```

## How was this patch tested?

Unit tests in `SQLQuerySuite`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #14919 from HyukjinKwon/SPARK-17354.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7d21437
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7d21437
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7d21437

Branch: refs/heads/master
Commit: f7d2143705c8c1baeed0bc62940f9dba636e705b
Parents: a3981c2
Author: hyukjinkwon <gu...@gmail.com>
Authored: Fri Sep 9 14:23:05 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Sep 9 14:23:05 2016 -0700

----------------------------------------------------------------------
 .../execution/vectorized/ColumnVectorUtils.java |  5 +-
 .../sql/execution/vectorized/ColumnarBatch.java |  6 +++
 .../datasources/parquet/ParquetIOSuite.scala    | 49 +++++++++++++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 21 +++++++++
 4 files changed, 78 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7d21437/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 2fa476b..900d7c4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -86,8 +86,9 @@ public class ColumnVectorUtils {
         col.getChildColumn(0).putInts(0, capacity, c.months);
         col.getChildColumn(1).putLongs(0, capacity, c.microseconds);
       } else if (t instanceof DateType) {
-        Date date = (Date)row.get(fieldIdx, t);
-        col.putInts(0, capacity, DateTimeUtils.fromJavaDate(date));
+        col.putInts(0, capacity, row.getInt(fieldIdx));
+      } else if (t instanceof TimestampType) {
+        col.putLongs(0, capacity, row.getLong(fieldIdx));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7d21437/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index f3afa8f..62abc2a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -137,6 +137,10 @@ public final class ColumnarBatch {
           DataType dt = columns[i].dataType();
           if (dt instanceof BooleanType) {
             row.setBoolean(i, getBoolean(i));
+          } else if (dt instanceof ByteType) {
+            row.setByte(i, getByte(i));
+          } else if (dt instanceof ShortType) {
+            row.setShort(i, getShort(i));
           } else if (dt instanceof IntegerType) {
             row.setInt(i, getInt(i));
           } else if (dt instanceof LongType) {
@@ -154,6 +158,8 @@ public final class ColumnarBatch {
             row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
           } else if (dt instanceof DateType) {
             row.setInt(i, getInt(i));
+          } else if (dt instanceof TimestampType) {
+            row.setLong(i, getLong(i));
           } else {
             throw new RuntimeException("Not implemented. " + dt);
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7d21437/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4aa046b..3161a63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,11 +38,12 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport
 // with an empty configuration (it is after all not intended to be used in this way?)
@@ -689,6 +690,52 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       }
     }
   }
+
+  test("VectorizedParquetRecordReader - partition column types") {
+    withTempPath { dir =>
+      Seq(1).toDF().repartition(1).write.parquet(dir.getCanonicalPath)
+
+      val dataTypes =
+        Seq(StringType, BooleanType, ByteType, ShortType, IntegerType, LongType,
+          FloatType, DoubleType, DecimalType(25, 5), DateType, TimestampType)
+
+      val constantValues =
+        Seq(
+          UTF8String.fromString("a string"),
+          true,
+          1.toByte,
+          2.toShort,
+          3,
+          Long.MaxValue,
+          0.25.toFloat,
+          0.75D,
+          Decimal("1234.23456"),
+          DateTimeUtils.fromJavaDate(java.sql.Date.valueOf("2015-01-01")),
+          DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123")))
+
+      dataTypes.zip(constantValues).foreach { case (dt, v) =>
+        val schema = StructType(StructField("pcol", dt) :: Nil)
+        val vectorizedReader = new VectorizedParquetRecordReader
+        val partitionValues = new GenericMutableRow(Array(v))
+        val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+
+        try {
+          vectorizedReader.initialize(file, null)
+          vectorizedReader.initBatch(schema, partitionValues)
+          vectorizedReader.nextKeyValue()
+          val row = vectorizedReader.getCurrentValue.asInstanceOf[InternalRow]
+
+          // Use `GenericMutableRow` by explicitly copying rather than `ColumnarBatch`
+          // in order to use get(...) method which is not implemented in `ColumnarBatch`.
+          val actual = row.copy().get(1, dt)
+          val expected = v
+          assert(actual == expected)
+        } finally {
+          vectorizedReader.close()
+        }
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/f7d21437/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 05d0687..dc4d099 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1787,6 +1787,27 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
+      sql(
+        """CREATE TABLE order(id INT)
+          |PARTITIONED BY (pd DATE, pt TIMESTAMP)
+          |STORED AS PARQUET
+        """.stripMargin)
+
+      sql("set hive.exec.dynamic.partition.mode=nonstrict")
+      sql(
+        """INSERT INTO TABLE order PARTITION(pd, pt)
+          |SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt
+        """.stripMargin)
+      val actual = sql("SELECT * FROM order")
+      val expected = sql(
+        "SELECT 1 AS id, CAST('1990-02-24' AS DATE) AS pd, CAST('1990-02-24' AS TIMESTAMP) AS pt")
+      checkAnswer(actual, expected)
+      sql("DROP TABLE order")
+    }
+  }
+
   def testCommandAvailable(command: String): Boolean = {
     val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
     attempt.isSuccess && attempt.get == 0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org