You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/01 15:15:52 UTC
[spark] branch branch-3.0 updated: [SPARK-31885][SQL] Fix filter
push down for old millis timestamps to Parquet
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 805884a [SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet
805884a is described below
commit 805884ad10aca3a532314284b0f2c007d4ca1045
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon Jun 1 15:13:44 2020 +0000
[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet
### What changes were proposed in this pull request?
Fixed conversions of `java.sql.Timestamp` to milliseconds in `ParquetFilter` by using existing functions from `DateTimeUtils` `fromJavaTimestamp()` and `microsToMillis()`.
### Why are the changes needed?
The changes fix the bug:
```scala
scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
scala> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
scala> Seq(java.sql.Timestamp.valueOf("1000-06-14 08:28:53.123")).toDF("ts").write.mode("overwrite").parquet("/Users/maximgekk/tmp/ts_millis_old_filter")
scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false)
+---+
|ts |
+---+
+---+
```
### Does this PR introduce _any_ user-facing change?
Yes, after the changes (for the example above):
```scala
scala> spark.read.parquet("/Users/maximgekk/tmp/ts_millis_old_filter").filter($"ts" === "1000-06-14 08:28:53.123").show(false)
+-----------------------+
|ts |
+-----------------------+
|1000-06-14 08:28:53.123|
+-----------------------+
```
### How was this patch tested?
Modified tests in `ParquetFilterSuite` to check old timestamps.
Closes #28693 from MaxGekk/parquet-ts-millis-filter.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 9c0dc28a6c33361a412672d63f3b974b75944965)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/parquet/ParquetFilters.scala | 27 +++++++++++-----------
.../datasources/parquet/ParquetFilterSuite.scala | 16 +++++++------
.../datasources/parquet/ParquetTest.scala | 4 +++-
3 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index d89186a..7900693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -148,6 +148,13 @@ class ParquetFilters(
Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
}
+ private def timestampToMillis(v: Any): JLong = {
+ val timestamp = v.asInstanceOf[Timestamp]
+ val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
+ val millis = DateTimeUtils.microsToMillis(micros)
+ millis.asInstanceOf[JLong]
+ }
+
private val makeEq:
PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = {
case ParquetBooleanType =>
@@ -184,7 +191,7 @@ class ParquetFilters(
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
- Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
+ Option(v).map(timestampToMillis).orNull)
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) => FilterApi.eq(
@@ -235,7 +242,7 @@ class ParquetFilters(
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
- Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
+ Option(v).map(timestampToMillis).orNull)
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) => FilterApi.notEq(
@@ -277,9 +284,7 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.lt(
- longColumn(n),
- v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
@@ -318,9 +323,7 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.ltEq(
- longColumn(n),
- v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
@@ -359,9 +362,7 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.gt(
- longColumn(n),
- v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
@@ -400,9 +401,7 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.gtEq(
- longColumn(n),
- v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
+ (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 7b33cef..c4cf511 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -589,19 +589,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
test("filter pushdown - timestamp") {
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
- val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
- Timestamp.valueOf("2018-06-15 08:28:53.123"),
- Timestamp.valueOf("2018-06-16 08:28:53.123"),
- Timestamp.valueOf("2018-06-17 08:28:53.123"))
+ val millisData = Seq(
+ Timestamp.valueOf("1000-06-14 08:28:53.123"),
+ Timestamp.valueOf("1582-06-15 08:28:53.001"),
+ Timestamp.valueOf("1900-06-16 08:28:53.0"),
+ Timestamp.valueOf("2018-06-17 08:28:53.999"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
testTimestampPushdown(millisData)
}
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
- val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
- Timestamp.valueOf("2018-06-15 08:28:53.123456"),
- Timestamp.valueOf("2018-06-16 08:28:53.123456"),
+ val microsData = Seq(
+ Timestamp.valueOf("1000-06-14 08:28:53.123456"),
+ Timestamp.valueOf("1582-06-15 08:28:53.123456"),
+ Timestamp.valueOf("1900-06-16 08:28:53.123456"),
Timestamp.valueOf("2018-06-17 08:28:53.123456"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index f572697..105f025 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -69,7 +69,9 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true)
(f: DataFrame => Unit): Unit = {
withTempPath { file =>
- df.write.format(dataSourceName).save(file.getCanonicalPath)
+ withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
+ df.write.format(dataSourceName).save(file.getCanonicalPath)
+ }
readFile(file.getCanonicalPath, testVectorized)(f)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org