You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/06/01 17:22:31 UTC
[spark] branch branch-3.0 updated: Revert "[SPARK-31885][SQL] Fix
filter push down for old millis timestamps to Parquet"
This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 76a0418 Revert "[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet"
76a0418 is described below
commit 76a041804d9ba856c221e944b9898c8c62854474
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Mon Jun 1 10:20:17 2020 -0700
Revert "[SPARK-31885][SQL] Fix filter push down for old millis timestamps to Parquet"
This reverts commit 805884ad10aca3a532314284b0f2c007d4ca1045.
---
.../datasources/parquet/ParquetFilters.scala | 27 +++++++++++-----------
.../datasources/parquet/ParquetFilterSuite.scala | 16 ++++++-------
.../datasources/parquet/ParquetTest.scala | 4 +---
3 files changed, 22 insertions(+), 25 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7900693..d89186a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -148,13 +148,6 @@ class ParquetFilters(
Binary.fromConstantByteArray(fixedLengthBytes, 0, numBytes)
}
- private def timestampToMillis(v: Any): JLong = {
- val timestamp = v.asInstanceOf[Timestamp]
- val micros = DateTimeUtils.fromJavaTimestamp(timestamp)
- val millis = DateTimeUtils.microsToMillis(micros)
- millis.asInstanceOf[JLong]
- }
-
private val makeEq:
PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = {
case ParquetBooleanType =>
@@ -191,7 +184,7 @@ class ParquetFilters(
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.eq(
longColumn(n),
- Option(v).map(timestampToMillis).orNull)
+ Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) => FilterApi.eq(
@@ -242,7 +235,7 @@ class ParquetFilters(
case ParquetTimestampMillisType if pushDownTimestamp =>
(n: Array[String], v: Any) => FilterApi.notEq(
longColumn(n),
- Option(v).map(timestampToMillis).orNull)
+ Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong]).orNull)
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) => FilterApi.notEq(
@@ -284,7 +277,9 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v))
+ (n: Array[String], v: Any) => FilterApi.lt(
+ longColumn(n),
+ v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
@@ -323,7 +318,9 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v))
+ (n: Array[String], v: Any) => FilterApi.ltEq(
+ longColumn(n),
+ v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
@@ -362,7 +359,9 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v))
+ (n: Array[String], v: Any) => FilterApi.gt(
+ longColumn(n),
+ v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
@@ -401,7 +400,9 @@ class ParquetFilters(
longColumn(n),
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong])
case ParquetTimestampMillisType if pushDownTimestamp =>
- (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v))
+ (n: Array[String], v: Any) => FilterApi.gtEq(
+ longColumn(n),
+ v.asInstanceOf[Timestamp].getTime.asInstanceOf[JLong])
case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal =>
(n: Array[String], v: Any) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index c4cf511..7b33cef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -589,21 +589,19 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
test("filter pushdown - timestamp") {
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
- val millisData = Seq(
- Timestamp.valueOf("1000-06-14 08:28:53.123"),
- Timestamp.valueOf("1582-06-15 08:28:53.001"),
- Timestamp.valueOf("1900-06-16 08:28:53.0"),
- Timestamp.valueOf("2018-06-17 08:28:53.999"))
+ val millisData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123"),
+ Timestamp.valueOf("2018-06-15 08:28:53.123"),
+ Timestamp.valueOf("2018-06-16 08:28:53.123"),
+ Timestamp.valueOf("2018-06-17 08:28:53.123"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) {
testTimestampPushdown(millisData)
}
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS
- val microsData = Seq(
- Timestamp.valueOf("1000-06-14 08:28:53.123456"),
- Timestamp.valueOf("1582-06-15 08:28:53.123456"),
- Timestamp.valueOf("1900-06-16 08:28:53.123456"),
+ val microsData = Seq(Timestamp.valueOf("2018-06-14 08:28:53.123456"),
+ Timestamp.valueOf("2018-06-15 08:28:53.123456"),
+ Timestamp.valueOf("2018-06-16 08:28:53.123456"),
Timestamp.valueOf("2018-06-17 08:28:53.123456"))
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 105f025..f572697 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -69,9 +69,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest {
protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true)
(f: DataFrame => Unit): Unit = {
withTempPath { file =>
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
- df.write.format(dataSourceName).save(file.getCanonicalPath)
- }
+ df.write.format(dataSourceName).save(file.getCanonicalPath)
readFile(file.getCanonicalPath, testVectorized)(f)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org