You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/01 15:15:52 UTC

[spark] branch branch-3.0 updated: [SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 805884a  [SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet
805884a is described below

commit 805884ad10aca3a532314284b0f2c007d4ca1045
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jun 1 15:13:44 2020 +0000

    [SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet
    
    ### What changes were proposed in this pull request?
    Fixed conversions of `java.sql.Timestamp` to milliseconds in `ParquetFilter` by using existing functions from `DateTimeUtils` `fromJavaTimestamp()` and `microsToMillis()`.
    
    ### Why are the changes needed?
    The changes fix the bug:
    ```scala
    scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
    scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    scala> Seq(java.sql.Timestamp.valueOf("1000-06-14 08:28:53.123")).toDF("ts").write.mode("overwrite").parquet("/Users/maximgekk/tmp/ts_millis_old_filter")
    scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false)
    +---+
    |ts |
    +---+
    +---+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, after the changes (for the example above):
    ```scala
    scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false)
    +-----------------------+
    |ts                     |
    +-----------------------+
    |1000-06-14 08:28:53.123|
    +-----------------------+
    ```
    
    ### How was this patch tested?
    Modified tests in `ParquetFilterSuite` to check old timestamps.
    
    Closes #28693 from MaxGekk/parquet-ts-millis-filter.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 9c0dc28a6c33361a412672d63f3b974b75944965)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/parquet/ParquetFilters.scala       | 27 +++++++++++-----------
 .../datasources/parquet/ParquetFilterSuite.scala   | 16 +++++++------
 .../datasources/parquet/ParquetTest.scala          |  4 +++-
 3 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index d89186a..7900693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -148,6 +148,13 @@ class ParquetFilters(
     Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
   }
 
+  private def timestampToMillis(v: Any): JLong = {
+    val timestamp = v.asInstanceOf[Timestamp]
+    val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
+    val millis = DateTimeUtils.microsToMillis(micros)
+    millis.asInstanceOf[JLong]
+  }
+
   private val makeEq:
     PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = {
     case ParquetBooleanType =>
@@ -184,7 +191,7 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
-        Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
+        Option(v).map(timestampToMillis).orNull)
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.eq(
@@ -235,7 +242,7 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
-        Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
+        Option(v).map(timestampToMillis).orNull)
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.notEq(
@@ -277,9 +284,7 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.lt(
-        longColumn(n),
-        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
@@ -318,9 +323,7 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.ltEq(
-        longColumn(n),
-        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
@@ -359,9 +362,7 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.gt(
-        longColumn(n),
-        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
@@ -400,9 +401,7 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.gtEq(
-        longColumn(n),
-        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7b33cef..c4cf511 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -589,19 +589,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
 
   test("filter pushdown - timestamp") {
     // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
-    val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
-      Timestamp.valueOf("2018-06-15 08:28:53.123"),
-      Timestamp.valueOf("2018-06-16 08:28:53.123"),
-      Timestamp.valueOf("2018-06-17 08:28:53.123"))
+    val millisData = Seq(
+      Timestamp.valueOf("1000-06-14 08:28:53.123"),
+      Timestamp.valueOf("1582-06-15 08:28:53.001"),
+      Timestamp.valueOf("1900-06-16 08:28:53.0"),
+      Timestamp.valueOf("2018-06-17 08:28:53.999"))
     withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
       ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
       testTimestampPushdown(millisData)
     }
 
     // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
-    val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
-      Timestamp.valueOf("2018-06-15 08:28:53.123456"),
-      Timestamp.valueOf("2018-06-16 08:28:53.123456"),
+    val microsData = Seq(
+      Timestamp.valueOf("1000-06-14 08:28:53.123456"),
+      Timestamp.valueOf("1582-06-15 08:28:53.123456"),
+      Timestamp.valueOf("1900-06-16 08:28:53.123456"),
       Timestamp.valueOf("2018-06-17 08:28:53.123456"))
     withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
       ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index f572697..105f025 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -69,7 +69,9 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
   protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true)
       (f: DataFrame => Unit): Unit = {
     withTempPath { file =>
-      df.write.format(dataSourceName).save(file.getCanonicalPath)
+      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
+        df.write.format(dataSourceName).save(file.getCanonicalPath)
+      }
       readFile(file.getCanonicalPath, testVectorized)(f)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org