You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/02 12:24:42 UTC
[spark] branch branch-3.0 updated: [SPARK-31888][SQL] Support
`java.time.Instant` in Parquet filter pushdown
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b85b954 [SPARK-31888][SQL] Support `java.time.Instant` in Parquet filter pushdown
b85b954 is described below
commit b85b954c91aa65dd6046e8c1282712fbb13d61e9
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Jun 2 11:53:58 2020 +0000
[SPARK-31888][SQL] Support `java.time.Instant` in Parquet filter pushdown
1. Modified `ParquetFilters.valueCanMakeFilterOn()` to accept filters with `java.time.Instant` attributes.
2. Added `ParquetFilters.timestampToMicros()` to support both types `java.sql.Timestamp` and `java.time.Instant` in conversions to microseconds.
3. Re-used `timestampToMicros` in constructing of Parquet filters.
To support pushed down filters with `java.time.Instant` attributes. Before the changes, date filters are not pushed down to Parquet datasource when `spark.sql.datetime.java8API.enabled` is `true`.
No
Modified tests to `ParquetFilterSuite` to check the case when Java 8 API is enabled.
Closes #28696 from MaxGekk/support-instant-parquet-filters.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/parquet/ParquetFilters.scala | 34 +++---
.../datasources/parquet/ParquetFilterSuite.scala | 119 ++++++++++++---------
2 files changed, 83 insertions(+), 70 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index d0977ba..13dee48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong}
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Timestamp}
-import java.time.LocalDate
+import java.time.{Instant, LocalDate}
import java.util.Locale
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -129,6 +129,11 @@ class ParquetFilters(
case ld: LocalDate => DateTimeUtils.localDateToDays(ld)
}
+ private def timestampToMicros(v: Any): JLong = v match {
+ case i: Instant => DateTimeUtils.instantToMicros(i)
+ case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
+ }
+
private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue()
private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue()
@@ -149,8 +154,7 @@ class ParquetFilters(
}
private def timestampToMillis(v: Any): JLong = {
- val timestamp = v.asInstanceOf[Timestamp]
- val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
+ val micros = timestampToMicros(v)
val millis = DateTimeUtils.toMillis(micros)
millis.asInstanceOf[JLong]
}
@@ -186,8 +190,7 @@ class ParquetFilters(
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
- Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
- .asInstanceOf[JLong]).orNull)
+ Option(v).map(timestampToMicros).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
@@ -237,8 +240,7 @@ class ParquetFilters(
case ParquetTimestampMicrosType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
- Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp])
- .asInstanceOf[JLong]).orNull)
+ Option(v).map(timestampToMicros).orNull)
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
@@ -280,9 +282,7 @@ class ParquetFilters(
(n: Array[String], v: Any) =>
FilterApi.lt(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.lt(
- longColumn(n),
- DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
@@ -319,9 +319,7 @@ class ParquetFilters(
(n: Array[String], v: Any) =>
FilterApi.ltEq(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.ltEq(
- longColumn(n),
- DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
@@ -358,9 +356,7 @@ class ParquetFilters(
(n: Array[String], v: Any) =>
FilterApi.gt(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.gt(
- longColumn(n),
- DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
@@ -397,9 +393,7 @@ class ParquetFilters(
(n: Array[String], v: Any) =>
FilterApi.gtEq(intColumn(n), dateToDays(v).asInstanceOf[Integer])
case ParquetTimestampMicrosType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.gtEq(
- longColumn(n),
- DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMicros(v))
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
@@ -475,7 +469,7 @@ class ParquetFilters(
case ParquetDateType =>
value.isInstanceOf[Date] || value.isInstanceOf[LocalDate]
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
- value.isInstanceOf[Timestamp]
+ value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant]
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index c4cf511..d20a07f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
-import java.time.LocalDate
+import java.time.{LocalDate, LocalDateTime, ZoneId}
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
@@ -143,7 +143,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
- private def testTimestampPushdown(data: Seq[Timestamp]): Unit = {
+ private def testTimestampPushdown(data: Seq[String], java8Api: Boolean): Unit = {
+ implicit class StringToTs(s: String) {
+ def ts: Timestamp = Timestamp.valueOf(s)
+ }
assert(data.size === 4)
val ts1 = data.head
val ts2 = data(1)
@@ -151,7 +154,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
val ts4 = data(3)
import testImplicits._
- withNestedDataFrame(data.map(i => Tuple1(i)).toDF()) { case (inputDF, colName, resultFun) =>
+ val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF()
+ withNestedDataFrame(df) { case (inputDF, colName, fun) =>
+ def resultFun(tsStr: String): Any = {
+ val parsed = if (java8Api) {
+ LocalDateTime.parse(tsStr.replace(" ", "T"))
+ .atZone(ZoneId.systemDefault())
+ .toInstant
+ } else {
+ Timestamp.valueOf(tsStr)
+ }
+ fun(parsed)
+ }
withParquetDataFrame(inputDF) { implicit df =>
val tsAttr = df(colName).expr
assert(df(colName).expr.dataType === TimestampType)
@@ -160,26 +174,26 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]],
data.map(i => Row.apply(resultFun(i))))
- checkFilterPredicate(tsAttr === ts1, classOf[Eq[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr <=> ts1, classOf[Eq[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr =!= ts1, classOf[NotEq[_]],
+ checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]],
Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
- checkFilterPredicate(tsAttr < ts2, classOf[Lt[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr > ts1, classOf[Gt[_]],
+ checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]],
Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
- checkFilterPredicate(tsAttr <= ts1, classOf[LtEq[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr >= ts4, classOf[GtEq[_]], resultFun(ts4))
-
- checkFilterPredicate(Literal(ts1) === tsAttr, classOf[Eq[_]], resultFun(ts1))
- checkFilterPredicate(Literal(ts1) <=> tsAttr, classOf[Eq[_]], resultFun(ts1))
- checkFilterPredicate(Literal(ts2) > tsAttr, classOf[Lt[_]], resultFun(ts1))
- checkFilterPredicate(Literal(ts3) < tsAttr, classOf[Gt[_]], resultFun(ts4))
- checkFilterPredicate(Literal(ts1) >= tsAttr, classOf[LtEq[_]], resultFun(ts1))
- checkFilterPredicate(Literal(ts4) <= tsAttr, classOf[GtEq[_]], resultFun(ts4))
-
- checkFilterPredicate(!(tsAttr < ts4), classOf[GtEq[_]], resultFun(ts4))
- checkFilterPredicate(tsAttr < ts2 || tsAttr > ts3, classOf[Operators.Or],
+ checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4))
+
+ checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1))
+ checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1))
+ checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1))
+ checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4))
+ checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1))
+ checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4))
+
+ checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4))
+ checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or],
Seq(Row(resultFun(ts1)), Row(resultFun(ts4))))
}
}
@@ -588,36 +602,41 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
test("filter pushdown - timestamp") {
- // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
- val millisData = Seq(
- Timestamp.valueOf("1000-06-14 08:28:53.123"),
- Timestamp.valueOf("1582-06-15 08:28:53.001"),
- Timestamp.valueOf("1900-06-16 08:28:53.0"),
- Timestamp.valueOf("2018-06-17 08:28:53.999"))
- withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
- ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
- testTimestampPushdown(millisData)
- }
-
- // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
- val microsData = Seq(
- Timestamp.valueOf("1000-06-14 08:28:53.123456"),
- Timestamp.valueOf("1582-06-15 08:28:53.123456"),
- Timestamp.valueOf("1900-06-16 08:28:53.123456"),
- Timestamp.valueOf("2018-06-17 08:28:53.123456"))
- withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
- ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
- testTimestampPushdown(microsData)
- }
-
- // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
- withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
- ParquetOutputTimestampType.INT96.toString) {
- import testImplicits._
- withParquetDataFrame(millisData.map(i => Tuple1(i)).toDF()) { implicit df =>
- val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
- assertResult(None) {
- createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+ Seq(true, false).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
+ val millisData = Seq(
+ "1000-06-14 08:28:53.123",
+ "1582-06-15 08:28:53.001",
+ "1900-06-16 08:28:53.0",
+ "2018-06-17 08:28:53.999")
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+ ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
+ testTimestampPushdown(millisData, java8Api)
+ }
+
+ // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
+ val microsData = Seq(
+ "1000-06-14 08:28:53.123456",
+ "1582-06-15 08:28:53.123456",
+ "1900-06-16 08:28:53.123456",
+ "2018-06-17 08:28:53.123456")
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+ ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
+ testTimestampPushdown(microsData, java8Api)
+ }
+
+ // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown
+ withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
+ ParquetOutputTimestampType.INT96.toString) {
+ import testImplicits._
+ withParquetDataFrame(
+ millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) { implicit df =>
+ val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
+ assertResult(None) {
+ createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+ }
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org