You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/08 01:35:17 UTC

[spark] branch branch-3.2 updated: [SPARK-38357][SQL][3.2] Fix StackOverflowError with OR(data filter, partition filter)

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 6ea9c52  [SPARK-38357][SQL][3.2] Fix StackOverflowError with OR(data filter, partition filter)
6ea9c52 is described below

commit 6ea9c528d85fc91d2b21c0afd2162c2eef5d9308
Author: huaxingao <hu...@apple.com>
AuthorDate: Tue Mar 8 09:20:32 2022 +0800

    [SPARK-38357][SQL][3.2] Fix StackOverflowError with OR(data filter, partition filter)
    
    ### What changes were proposed in this pull request?
    If the filter has OR and contains both data filter and partition filter,
    e.g. `p` is partition col and `id` is data col
    ```
    SELECT * FROM tmp WHERE (p = 0 AND id > 0) OR (p = 1 AND id = 2)
    ```
    Spark throws StackOverflowError
    
    ### Why are the changes needed?
    bug fixing
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    new UT
    
    Closes #35691 from huaxingao/3.2.
    
    Authored-by: huaxingao <hu...@apple.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/PruneFileSourcePartitions.scala    |  6 ++++--
 .../PruneFileSourcePartitionsSuite.scala           | 22 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 0927027..0642782 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -124,8 +124,10 @@ private[sql] object PruneFileSourcePartitions
       val (partitionKeyFilters, dataFilters) =
         getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation,
           scan.readPartitionSchema, filters, output)
-      // The dataFilters are pushed down only once
-      if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
+      // if there are only partition filters, or the data filters have not been pushed yet,
+      // trigger push down
+      if ((partitionKeyFilters.nonEmpty && dataFilters.isEmpty)
+        || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
         val prunedV2Relation =
           v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters))
         // The pushed down partition filters don't need to be reevaluated.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
index 98d3d65..3af6c25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
@@ -118,6 +118,28 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared
     }
   }
 
+  test("SPARK-38357: data + partition filters with OR") {
+    // Force datasource v2 for parquet
+    withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) {
+      withTempPath { dir =>
+        spark.range(10).coalesce(1).selectExpr("id", "id % 3 as p")
+          .write.partitionBy("p").parquet(dir.getCanonicalPath)
+        withTempView("tmp") {
+          spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp")
+          assertPrunedPartitions("SELECT * FROM tmp WHERE (p = 0 AND id > 0) OR (p = 1 AND id = 2)",
+            2,
+            "((tmp.p = 0) || (tmp.p = 1))")
+          assertPrunedPartitions("SELECT * FROM tmp WHERE p = 0 AND id > 0",
+            1,
+            "(tmp.p = 0)")
+          assertPrunedPartitions("SELECT * FROM tmp WHERE p = 0",
+            1,
+            "(tmp.p = 0)")
+        }
+      }
+   }
+  }
+
   protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = {
     case scan: FileSourceScanExec => scan.partitionFilters
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org