You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/15 08:47:50 UTC
[spark] branch master updated: [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd29ca78905 [SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter
bd29ca78905 is described below
commit bd29ca7890554ac8932be59097e6345505a36c4c
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Nov 15 16:47:29 2022 +0800
[SPARK-41112][SQL] RuntimeFilter should apply ColumnPruning eagerly with in-subquery filter
### What changes were proposed in this pull request?
Apply ColumnPruning for in subquery filter.
Note that, the bloom filter side has already fixed by https://github.com/apache/spark/pull/36047
### Why are the changes needed?
The inferred in-subquery filter should apply ColumnPruning before get plan statistics and check if can be broadcasted. Otherwise, the final physical plan will be different from expected.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add test
Closes #38619 from ulysses-you/SPARK-41112.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala | 2 +-
.../test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala | 7 ++++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 62782f6051b..efcf607b589 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -99,7 +99,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J
require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType)
val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
- val aggregate = Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)
+ val aggregate = ColumnPruning(Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan))
if (!canBroadcastBySize(aggregate, conf)) {
// Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold,
// i.e., the semi-join will be a shuffled join, which is not worthwhile.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 0e016e19a62..fda442eeef0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
-import org.apache.spark.sql.catalyst.optimizer.MergeScalarSubqueries
+import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, MergeScalarSubqueries}
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
@@ -257,6 +257,11 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
ensureLeftSemiJoinExists(planEnabled)
assert(normalizedEnabled != normalizedDisabled)
+ val agg = planEnabled.collect {
+ case Join(_, agg: Aggregate, LeftSemi, _, _) => agg
+ }
+ assert(agg.size == 1)
+ assert(agg.head.fastEquals(ColumnPruning(agg.head)))
} else {
comparePlans(planDisabled, planEnabled)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org