You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/03 03:31:56 UTC

[spark] branch branch-3.0 updated: [SPARK-32509][SQL] Ignore unused DPP True Filter in Canonicalization

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d8d3e87  [SPARK-32509][SQL] Ignore unused DPP True Filter in Canonicalization
d8d3e87 is described below

commit d8d3e870a03f7695eb8fdbaaafa13d7df756801a
Author: Prakhar Jain <pr...@gmail.com>
AuthorDate: Mon Aug 3 03:26:03 2020 +0000

    [SPARK-32509][SQL] Ignore unused DPP True Filter in Canonicalization
    
    ### What changes were proposed in this pull request?
    This PR fixes issues relate to Canonicalization of FileSourceScanExec when it contains unused DPP Filter.
    
    ### Why are the changes needed?
    
    As part of PlanDynamicPruningFilter rule, the unused DPP Filter are simply replaced by `DynamicPruningExpression(TrueLiteral)` so that they can be avoided. But these unnecessary`DynamicPruningExpression(TrueLiteral)` partition filter inside the FileSourceScanExec affects the canonicalization of the node and so in many cases, this can prevent ReuseExchange from happening.
    
    This PR fixes this issue by ignoring the unused DPP filter in the `def doCanonicalize` method.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT.
    
    Closes #29318 from prakharjain09/SPARK-32509_df_reuse.
    
    Authored-by: Prakhar Jain <pr...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 7a09e71198a094250f04e0f82f0c7c9860169540)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 10 ++++++++-
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 24 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 90a3f97..7f534d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -589,12 +589,20 @@ case class FileSourceScanExec(
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }
 
+  // Filters unused DynamicPruningExpression expressions - one which has been replaced
+  // with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
+  private def filterUnusedDynamicPruningExpressions(
+      predicates: Seq[Expression]): Seq[Expression] = {
+    predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral))
+  }
+
   override def doCanonicalize(): FileSourceScanExec = {
     FileSourceScanExec(
       relation,
       output.map(QueryPlan.normalizeExpressions(_, output)),
       requiredSchema,
-      QueryPlan.normalizePredicates(partitionFilters, output),
+      QueryPlan.normalizePredicates(
+        filterUnusedDynamicPruningExpressions(partitionFilters), output),
       optionalBucketSet,
       QueryPlan.normalizePredicates(dataFilters, output),
       None)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index cdf9ea4..1fbc4a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1240,6 +1240,30 @@ abstract class DynamicPartitionPruningSuiteBase
     }
   }
 
+  test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+    "canonicalization and exchange reuse") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        val df = sql(
+          """ WITH view1 as (
+            |   SELECT f.store_id FROM fact_stats f WHERE f.units_sold = 70
+            | )
+            |
+            | SELECT * FROM view1 v1 join view1 v2 WHERE v1.store_id = v2.store_id
+          """.stripMargin)
+
+        checkPartitionPruningPredicate(df, false, false)
+        val reuseExchangeNodes = df.queryExecution.executedPlan.collect {
+          case se: ReusedExchangeExec => se
+        }
+        assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " +
+          s"nodes. Found ${reuseExchangeNodes.size}")
+
+        checkAnswer(df, Row(15, 15) :: Nil)
+      }
+    }
+  }
+
   test("Plan broadcast pruning only when the broadcast can be reused") {
     Given("dynamic pruning filter on the build side")
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org