You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/26 01:39:51 UTC
[spark] branch branch-3.0 updated: [SPARK-31818][SQL] Fix pushing
down filters with `java.time.Instant` values in ORC
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b8da5be [SPARK-31818][SQL] Fix pushing down filters with `java.time.Instant` values in ORC
b8da5be is described below
commit b8da5be6cc3e00118b439ffb94fecb029c09ebf5
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Mon May 25 18:36:02 2020 -0700
[SPARK-31818][SQL] Fix pushing down filters with `java.time.Instant` values in ORC
### What changes were proposed in this pull request?
Convert `java.time.Instant` to `java.sql.Timestamp` in pushed down filters to ORC datasource when Java 8 time API enabled.
### Why are the changes needed?
The changes fix the exception raised while pushing date filters when `spark.sql.datetime.java8API.enabled` is set to `true`:
```
java.lang.IllegalArgumentException: Wrong value class java.time.Instant for TIMESTAMP.EQUALS leaf
at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)
```
### Does this PR introduce any user-facing change?
Yes
### How was this patch tested?
Added tests to `OrcFilterSuite`.
Closes #28636 from MaxGekk/orc-timestamp-filter-pushdown.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 6c80ebbccb7f354f645dd63a73114332d26f901f)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/sql/catalyst/dsl/package.scala | 3 +-
.../sql/execution/datasources/orc/OrcFilters.scala | 6 ++-
.../execution/datasources/orc/OrcFilterSuite.scala | 56 +++++++++++++---------
.../sql/execution/datasources/orc/OrcFilters.scala | 6 ++-
.../execution/datasources/orc/OrcFilterSuite.scala | 54 +++++++++++++--------
5 files changed, 77 insertions(+), 48 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index fe3fea5..26f5bee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst
import java.sql.{Date, Timestamp}
-import java.time.LocalDate
+import java.time.{Instant, LocalDate}
import scala.language.implicitConversions
@@ -152,6 +152,7 @@ package object dsl {
implicit def bigDecimalToLiteral(d: java.math.BigDecimal): Literal = Literal(d)
implicit def decimalToLiteral(d: Decimal): Literal = Literal(d)
implicit def timestampToLiteral(t: Timestamp): Literal = Literal(t)
+ implicit def instantToLiteral(i: Instant): Literal = Literal(i)
implicit def binaryToLiteral(a: Array[Byte]): Literal = Literal(a)
implicit def symbolToUnresolvedAttribute(s: Symbol): analysis.UnresolvedAttribute =
diff --git a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index a01d5a4..b685639 100644
--- a/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/v1.2/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.orc
-import java.time.LocalDate
+import java.time.{Instant, LocalDate}
import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
@@ -26,7 +26,7 @@ import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, toJavaDate}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -167,6 +167,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
case _: DateType if value.isInstanceOf[LocalDate] =>
toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
+ case _: TimestampType if value.isInstanceOf[Instant] =>
+ toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
case _ => value
}
diff --git a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index a1c325e..88b4b24 100644
--- a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -245,29 +245,41 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
}
test("filter pushdown - timestamp") {
- val timeString = "2015-08-20 14:57:00"
- val timestamps = (1 to 4).map { i =>
- val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
- new Timestamp(milliseconds)
- }
- withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+ val input = Seq(
+ "1000-01-01 01:02:03",
+ "1582-10-01 00:11:22",
+ "1900-01-01 23:59:59",
+ "2020-05-25 10:11:12").map(Timestamp.valueOf)
- checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-
- checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
-
- checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(Literal(timestamps(0)) <=> $"_1",
- PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(timestamps(2)) < $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(timestamps(0)) >= $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
+ withOrcFile(input.map(Tuple1(_))) { path =>
+ Seq(false, true).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ readFile(path) { implicit df =>
+ val timestamps = input.map(Literal(_))
+ checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
+
+ checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(
+ Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(
+ Literal(timestamps(2)) < $"_1",
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(
+ Literal(timestamps(0)) >= $"_1",
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+ }
}
}
diff --git a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
index 445a52c..4b64208 100644
--- a/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
+++ b/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.orc
-import java.time.LocalDate
+import java.time.{Instant, LocalDate}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{localDateToDays, toJavaDate}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
@@ -167,6 +167,8 @@ private[sql] object OrcFilters extends OrcFiltersBase {
new HiveDecimalWritable(HiveDecimal.create(value.asInstanceOf[java.math.BigDecimal]))
case _: DateType if value.isInstanceOf[LocalDate] =>
toJavaDate(localDateToDays(value.asInstanceOf[LocalDate]))
+ case _: TimestampType if value.isInstanceOf[Instant] =>
+ toJavaTimestamp(instantToMicros(value.asInstanceOf[Instant]))
case _ => value
}
diff --git a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index 815af05..2263179 100644
--- a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -246,29 +246,41 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
}
test("filter pushdown - timestamp") {
- val timeString = "2015-08-20 14:57:00"
- val timestamps = (1 to 4).map { i =>
- val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
- new Timestamp(milliseconds)
- }
- withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
- checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
-
- checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ val input = Seq(
+ "1000-01-01 01:02:03",
+ "1582-10-01 00:11:22",
+ "1900-01-01 23:59:59",
+ "2020-05-25 10:11:12").map(Timestamp.valueOf)
- checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
+ withOrcFile(input.map(Tuple1(_))) { path =>
+ Seq(false, true).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ readFile(path) { implicit df =>
+ val timestamps = input.map(Literal(_))
+ checkFilterPredicate($"_1".isNull, PredicateLeaf.Operator.IS_NULL)
- checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
- checkFilterPredicate(
- Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
- checkFilterPredicate(Literal(timestamps(2)) < $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(timestamps(0)) >= $"_1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
- checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate($"_1" === timestamps(0), PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate($"_1" <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+ checkFilterPredicate($"_1" < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate($"_1" > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate($"_1" >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
+
+ checkFilterPredicate(Literal(timestamps(0)) === $"_1", PredicateLeaf.Operator.EQUALS)
+ checkFilterPredicate(
+ Literal(timestamps(0)) <=> $"_1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+ checkFilterPredicate(Literal(timestamps(1)) > $"_1", PredicateLeaf.Operator.LESS_THAN)
+ checkFilterPredicate(
+ Literal(timestamps(2)) < $"_1",
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(
+ Literal(timestamps(0)) >= $"_1",
+ PredicateLeaf.Operator.LESS_THAN_EQUALS)
+ checkFilterPredicate(Literal(timestamps(3)) <= $"_1", PredicateLeaf.Operator.LESS_THAN)
+ }
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org