You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/08 01:35:17 UTC
[spark] branch branch-3.2 updated: [SPARK-38357][SQL][3.2] Fix StackOverflowError with OR(data filter, partition filter)
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 6ea9c52 [SPARK-38357][SQL][3.2] Fix StackOverflowError with OR(data filter, partition filter)
6ea9c52 is described below
commit 6ea9c528d85fc91d2b21c0afd2162c2eef5d9308
Author: huaxingao <hu...@apple.com>
AuthorDate: Tue Mar 8 09:20:32 2022 +0800
[SPARK-38357][SQL][3.2] Fix StackOverflowError with OR(data filter, partition filter)
### What changes were proposed in this pull request?
If the filter has OR and contains both data filter and partition filter,
e.g. `p` is partition col and `id` is data col
```
SELECT * FROM tmp WHERE (p = 0 AND id > 0) OR (p = 1 AND id = 2)
```
Spark throws StackOverflowError
### Why are the changes needed?
bug fixing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
new UT
Closes #35691 from huaxingao/3.2.
Authored-by: huaxingao <hu...@apple.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/PruneFileSourcePartitions.scala | 6 ++++--
.../PruneFileSourcePartitionsSuite.scala | 22 ++++++++++++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 0927027..0642782 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -124,8 +124,10 @@ private[sql] object PruneFileSourcePartitions
val (partitionKeyFilters, dataFilters) =
getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation,
scan.readPartitionSchema, filters, output)
- // The dataFilters are pushed down only once
- if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
+ // if there are only partition filters, or the data filters have not been pushed yet,
+ // trigger push down
+ if ((partitionKeyFilters.nonEmpty && dataFilters.isEmpty)
+ || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
val prunedV2Relation =
v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters))
// The pushed down partition filters don't need to be reevaluated.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
index 98d3d65..3af6c25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
@@ -118,6 +118,28 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared
}
}
+ test("SPARK-38357: data + partition filters with OR") {
+ // Force datasource v2 for parquet
+ withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) {
+ withTempPath { dir =>
+ spark.range(10).coalesce(1).selectExpr("id", "id % 3 as p")
+ .write.partitionBy("p").parquet(dir.getCanonicalPath)
+ withTempView("tmp") {
+ spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp")
+ assertPrunedPartitions("SELECT * FROM tmp WHERE (p = 0 AND id > 0) OR (p = 1 AND id = 2)",
+ 2,
+ "((tmp.p = 0) || (tmp.p = 1))")
+ assertPrunedPartitions("SELECT * FROM tmp WHERE p = 0 AND id > 0",
+ 1,
+ "(tmp.p = 0)")
+ assertPrunedPartitions("SELECT * FROM tmp WHERE p = 0",
+ 1,
+ "(tmp.p = 0)")
+ }
+ }
+ }
+ }
+
protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
case scan: FileSourceScanExec => scan.partitionFilters
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org