You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/06/01 17:22:31 UTC

[spark] branch branch-3.0 updated: Revert "[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet"

This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 76a0418  Revert "[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet"
76a0418 is described below

commit 76a041804d9ba856c221e944b9898c8c62854474
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Mon Jun 1 10:20:17 2020 -0700

    Revert "[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet"
    
    This reverts commit 805884ad10aca3a532314284b0f2c007d4ca1045.
---
 .../datasources/parquet/ParquetFilters.scala       | 27 +++++++++++-----------
 .../datasources/parquet/ParquetFilterSuite.scala   | 16 ++++++-------
 .../datasources/parquet/ParquetTest.scala          |  4 +---
 3 files changed, 22 insertions(+), 25 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7900693..d89186a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -148,13 +148,6 @@ class ParquetFilters(
     Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
   }
 
-  private def timestampToMillis(v: Any): JLong = {
-    val timestamp = v.asInstanceOf[Timestamp]
-    val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
-    val millis = DateTimeUtils.microsToMillis(micros)
-    millis.asInstanceOf[JLong]
-  }
-
   private val makeEq:
     PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = {
     case ParquetBooleanType =>
@@ -191,7 +184,7 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
-        Option(v).map(timestampToMillis).orNull)
+        Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.eq(
@@ -242,7 +235,7 @@ class ParquetFilters(
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
-        Option(v).map(timestampToMillis).orNull)
+        Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) => FilterApi.notEq(
@@ -284,7 +277,9 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
+      (n: Array[String], v: Any) => FilterApi.lt(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
@@ -323,7 +318,9 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
+      (n: Array[String], v: Any) => FilterApi.ltEq(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
@@ -362,7 +359,9 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
+      (n: Array[String], v: Any) => FilterApi.gt(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
@@ -401,7 +400,9 @@ class ParquetFilters(
         longColumn(n),
         DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
     case ParquetTimestampMillisType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
+      (n: Array[String], v: Any) => FilterApi.gtEq(
+        longColumn(n),
+        v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
 
     case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
       (n: Array[String], v: Any) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index c4cf511..7b33cef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -589,21 +589,19 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
 
   test("filter pushdown - timestamp") {
     // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
-    val millisData = Seq(
-      Timestamp.valueOf("1000-06-14 08:28:53.123"),
-      Timestamp.valueOf("1582-06-15 08:28:53.001"),
-      Timestamp.valueOf("1900-06-16 08:28:53.0"),
-      Timestamp.valueOf("2018-06-17 08:28:53.999"))
+    val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
+      Timestamp.valueOf("2018-06-15 08:28:53.123"),
+      Timestamp.valueOf("2018-06-16 08:28:53.123"),
+      Timestamp.valueOf("2018-06-17 08:28:53.123"))
     withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
       ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
       testTimestampPushdown(millisData)
     }
 
     // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
-    val microsData = Seq(
-      Timestamp.valueOf("1000-06-14 08:28:53.123456"),
-      Timestamp.valueOf("1582-06-15 08:28:53.123456"),
-      Timestamp.valueOf("1900-06-16 08:28:53.123456"),
+    val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
+      Timestamp.valueOf("2018-06-15 08:28:53.123456"),
+      Timestamp.valueOf("2018-06-16 08:28:53.123456"),
       Timestamp.valueOf("2018-06-17 08:28:53.123456"))
     withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
       ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 105f025..f572697 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -69,9 +69,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
   protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true)
       (f: DataFrame => Unit): Unit = {
     withTempPath { file =>
-      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
-        df.write.format(dataSourceName).save(file.getCanonicalPath)
-      }
+      df.write.format(dataSourceName).save(file.getCanonicalPath)
       readFile(file.getCanonicalPath, testVectorized)(f)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org