You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/15 08:47:50 UTC

[spark] branch master updated: [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd29ca78905 [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter
bd29ca78905 is described below

commit bd29ca7890554ac8932be59097e6345505a36c4c
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Nov 15 16:47:29 2022 +0800

    [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter
    
    ### What changes were proposed in this pull request?
    
    Apply ColumnPruning for in subquery filter.
    
    Note that, the bloom filter side has already fixed by https://github.com/apache/spark/pull/36047
    
    ### Why are the changes needed?
    
    The inferred in-subquery filter should apply ColumnPruning before get plan statistics and check if can be broadcasted. Otherwise, the final physical plan will be different from expected.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add test
    
    Closes #38619 from ulysses-you/SPARK-41112.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala  | 2 +-
 .../test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala | 7 ++++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 62782f6051b..efcf607b589 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -99,7 +99,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
     require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
     val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
     val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
-    val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
+    val aggregate = ColumnPruning(Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan))
     if (!canBroadcastBySize(aggregate, conf)) {
       // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
       // i.e., the semi-join will be a shuffled join, which is not worthwhile.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 0e016e19a62..fda442eeef0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
-import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries
+import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, MergeScalarSubqueries}
 import org.apache.spark.sql.catalyst.plans.LeftSemi
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
@@ -257,6 +257,11 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
         val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
         ensureLeftSemiJoinExists(planEnabled)
         assert(normalizedEnabled != normalizedDisabled)
+        val agg = planEnabled.collect {
+          case Join(_, agg: Aggregate, LeftSemi, _, _) => agg
+        }
+        assert(agg.size == 1)
+        assert(agg.head.fastEquals(ColumnPruning(agg.head)))
       } else {
         comparePlans(planDisabled, planEnabled)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org