You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/02 12:24:42 UTC

[spark] branch branch-3.0 updated: [SPARK-31888][SQL] Support `java.time.Instant` in Parquet filter pushdown

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b85b954  [SPARK-31888][SQL] Support `java.time.Instant` in Parquet filter pushdown
b85b954 is described below

commit b85b954c91aa65dd6046e8c1282712fbb13d61e9
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Jun 2 11:53:58 2020 +0000

    [SPARK-31888][SQL] Support `java.time.Instant` in Parquet filter pushdown
    
    1. Modified `ParquetFilters.valueCanMakeFilterOn()` to accept filters with `java.time.Instant` attributes.
    2. Added `ParquetFilters.timestampToMicros()` to support both types `java.sql.Timestamp` and `java.time.Instant` in conversions to microseconds.
    3. Re-used `timestampToMicros` in constructing of Parquet filters.
    
    To support pushed down filters with `java.time.Instant` attributes. Before the changes, date filters are not pushed down to Parquet datasource when `spark.sql.datetime.java8API.enabled` is `true`.
    
    No
    
    Modified tests to `ParquetFilterSuite` to check the case when Java 8 API is enabled.
    
    Closes #28696 from MaxGekk/support-instant-parquet-filters.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/parquet/ParquetFilters.scala       |  34 +++---
 .../datasources/parquet/ParquetFilterSuite.scala   | 119 ++++++++++++---------
 2 files changed, 83 insertions(+), 70 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index d0977ba..13dee48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Timestamp}
-import java.time.LocalDate
+import java.time.{Instant, LocalDate}
 import java.util.Locale
 
 import scala.collection.JavaConverters.asScalaBufferConverter
@@ -129,6 +129,11 @@ class ParquetFilters(
     case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
   }
 
+  private def timestampToMicros(v: Any): JLong = v match {
+    case i: Instant => DateTimeUtils.instantToMicros(i)
+    case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
+  }
+
   private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
 
   private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue()
@@ -149,8 +154,7 @@ class ParquetFilters(
   }
 
   private def timestampToMillis(v: Any): JLong = {
-    val timestamp = v.asInstanceOf[Timestamp]
-    val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
+    val micros = timestampToMicros(v)
     val millis = DateTimeUtils.toMillis(micros)
     millis.asInstanceOf[JLong]
   }
@@ -186,8 +190,7 @@ class ParquetFilters(
     case ParquetTimestampMicrosType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
-        Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
-          .asInstanceOf[JLong]).orNull)
+        Option(v).map(timestampToMicros).orNull)
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.eq(
         longColumn(n),
@@ -237,8 +240,7 @@ class ParquetFilters(
     case ParquetTimestampMicrosType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
-        Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
-          .asInstanceOf[JLong]).orNull)
+        Option(v).map(timestampToMicros).orNull)
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.notEq(
         longColumn(n),
@@ -280,9 +282,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) =>
         FilterApi.lt(intColumn(n), dateToDays(v).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.lt(
-        longColumn(n),
-        DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
 
@@ -319,9 +319,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) =>
         FilterApi.ltEq(intColumn(n), dateToDays(v).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.ltEq(
-        longColumn(n),
-        DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
 
@@ -358,9 +356,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) =>
         FilterApi.gt(intColumn(n), dateToDays(v).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.gt(
-        longColumn(n),
-        DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
 
@@ -397,9 +393,7 @@ class ParquetFilters(
       (n: Array[String], v: Any) =>
         FilterApi.gtEq(intColumn(n), dateToDays(v).asInstanceOf[Integer])
     case ParquetTimestampMicrosType if pushDownTimestamp =>
-      (n: Array[String], v: Any) => FilterApi.gtEq(
-        longColumn(n),
-        DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+      (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMicros(v))
     case ParquetTimestampMillisType if pushDownTimestamp =>
       (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
 
@@ -475,7 +469,7 @@ class ParquetFilters(
       case ParquetDateType =>
         value.isInstanceOf[Date] || value.isInstanceOf[LocalDate]
       case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
-        value.isInstanceOf[Timestamp]
+        value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant]
       case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
         isDecimalMatched(value, decimalMeta)
       case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index c4cf511..d20a07f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.time.LocalDate
+import java.time.{LocalDate, LocalDateTime, ZoneId}
 
 import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
 import org.apache.parquet.filter2.predicate.FilterApi._
@@ -143,7 +143,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
     }
   }
 
-  private def testTimestampPushdown(data: Seq[Timestamp]): Unit = {
+  private def testTimestampPushdown(data: Seq[String], java8Api: Boolean): Unit = {
+    implicit class StringToTs(s: String) {
+      def ts: Timestamp = Timestamp.valueOf(s)
+    }
     assert(data.size === 4)
     val ts1 = data.head
     val ts2 = data(1)
@@ -151,7 +154,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
     val ts4 = data(3)
 
     import testImplicits._
-    withNestedDataFrame(data.map(i => Tuple1(i)).toDF()) { case (inputDF, colName, resultFun) =>
+    val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF()
+    withNestedDataFrame(df) { case (inputDF, colName, fun) =>
+      def resultFun(tsStr: String): Any = {
+        val parsed = if (java8Api) {
+          LocalDateTime.parse(tsStr.replace(" ", "T"))
+            .atZone(ZoneId.systemDefault())
+            .toInstant
+        } else {
+          Timestamp.valueOf(tsStr)
+        }
+        fun(parsed)
+      }
       withParquetDataFrame(inputDF) { implicit df =>
         val tsAttr = df(colName).expr
         assert(df(colName).expr.dataType === TimestampType)
@@ -160,26 +174,26 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
         checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]],
           data.map(i => Row.apply(resultFun(i))))
 
-        checkFilterPredicate(tsAttr === ts1, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr <=> ts1, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr =!= ts1, classOf[NotEq[_]],
+        checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1))
+        checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1))
+        checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]],
           Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
 
-        checkFilterPredicate(tsAttr < ts2, classOf[Lt[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr > ts1, classOf[Gt[_]],
+        checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1))
+        checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]],
           Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
-        checkFilterPredicate(tsAttr <= ts1, classOf[LtEq[_]], resultFun(ts1))
-        checkFilterPredicate(tsAttr >= ts4, classOf[GtEq[_]], resultFun(ts4))
-
-        checkFilterPredicate(Literal(ts1) === tsAttr, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts1) <=> tsAttr, classOf[Eq[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts2) > tsAttr, classOf[Lt[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts3) < tsAttr, classOf[Gt[_]], resultFun(ts4))
-        checkFilterPredicate(Literal(ts1) >= tsAttr, classOf[LtEq[_]], resultFun(ts1))
-        checkFilterPredicate(Literal(ts4) <= tsAttr, classOf[GtEq[_]], resultFun(ts4))
-
-        checkFilterPredicate(!(tsAttr < ts4), classOf[GtEq[_]], resultFun(ts4))
-        checkFilterPredicate(tsAttr < ts2 || tsAttr > ts3, classOf[Operators.Or],
+        checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1))
+        checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4))
+
+        checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1))
+        checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1))
+        checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1))
+        checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4))
+        checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1))
+        checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4))
+
+        checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4))
+        checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or],
           Seq(Row(resultFun(ts1)), Row(resultFun(ts4))))
       }
     }
@@ -588,36 +602,41 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
   }
 
   test("filter pushdown - timestamp") {
-    // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
-    val millisData = Seq(
-      Timestamp.valueOf("1000-06-14 08:28:53.123"),
-      Timestamp.valueOf("1582-06-15 08:28:53.001"),
-      Timestamp.valueOf("1900-06-16 08:28:53.0"),
-      Timestamp.valueOf("2018-06-17 08:28:53.999"))
-    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
-      ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
-      testTimestampPushdown(millisData)
-    }
-
-    // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
-    val microsData = Seq(
-      Timestamp.valueOf("1000-06-14 08:28:53.123456"),
-      Timestamp.valueOf("1582-06-15 08:28:53.123456"),
-      Timestamp.valueOf("1900-06-16 08:28:53.123456"),
-      Timestamp.valueOf("2018-06-17 08:28:53.123456"))
-    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
-      ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
-      testTimestampPushdown(microsData)
-    }
-
-    // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
-    withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
-      ParquetOutputTimestampType.INT96.toString) {
-      import testImplicits._
-      withParquetDataFrame(millisData.map(i => Tuple1(i)).toDF()) { implicit df =>
-        val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
-        assertResult(None) {
-          createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+    Seq(true, false).foreach { java8Api =>
+      withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+        // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
+        val millisData = Seq(
+          "1000-06-14 08:28:53.123",
+          "1582-06-15 08:28:53.001",
+          "1900-06-16 08:28:53.0",
+          "2018-06-17 08:28:53.999")
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+          ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
+          testTimestampPushdown(millisData, java8Api)
+        }
+
+        // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
+        val microsData = Seq(
+          "1000-06-14 08:28:53.123456",
+          "1582-06-15 08:28:53.123456",
+          "1900-06-16 08:28:53.123456",
+          "2018-06-17 08:28:53.123456")
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+          ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
+          testTimestampPushdown(microsData, java8Api)
+        }
+
+        // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
+        withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+          ParquetOutputTimestampType.INT96.toString) {
+          import testImplicits._
+          withParquetDataFrame(
+            millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) { implicit df =>
+            val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
+            assertResult(None) {
+              createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+            }
+          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org