You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/14 05:22:51 UTC
[spark] branch master updated: [SPARK-39672][SQL][3.1] Fix removing project before filter with correlated subquery
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c8c280cb631 [SPARK-39672][SQL][3.1] Fix removing project before filter with correlated subquery
c8c280cb631 is described below
commit c8c280cb631f61fee9a2876428659947d5a0634a
Author: tianlzhang <ti...@ebay.com>
AuthorDate: Thu Jul 14 12:49:57 2022 +0800
[SPARK-39672][SQL][3.1] Fix removing project before filter with correlated subquery
Add more checks to`removeProjectBeforeFilter` in `ColumnPruning` and only remove the project if
1. the filter condition contains correlated subquery
2. same attribute exists in both output of child of Project and subquery
This is a legitimate self-join query and should not throw exception when de-duplicating attributes in subquery and outer values.
```sql
select * from
(
select v1.a, v1.b, v2.c
from v1
inner join v2
on v1.a=v2.a) t3
where not exists (
select 1
from v2
where t3.a=v2.a and t3.b=v2.b and t3.c=v2.c
)
```
Here's what happens with the current code. The above query is analyzed into following `LogicalPlan` before `ColumnPruning`.
```
Project [a#250, b#251, c#268]
+- Filter NOT exists#272 [(a#250 = a#266) && (b#251 = b#267) && (c#268 = c#268#277)]
: +- Project [1 AS 1#273, _1#259 AS a#266, _2#260 AS b#267, _3#261 AS c#268#277]
: +- LocalRelation [_1#259, _2#260, _3#261]
+- Project [a#250, b#251, c#268]
+- Join Inner, (a#250 = a#266)
:- Project [a#250, b#251]
: +- Project [_1#243 AS a#250, _2#244 AS b#251]
: +- LocalRelation [_1#243, _2#244, _3#245]
+- Project [a#266, c#268]
+- Project [_1#259 AS a#266, _3#261 AS c#268]
+- LocalRelation [_1#259, _2#260, _3#261]
```
Then in `ColumnPruning`, the Project before Filter (between Filter and Join) is removed. This changes the `outputSet` of the child of Filter among which the same attribute also exists in the subquery. Later, when `RewritePredicateSubquery` de-duplicates conflicting attributes, it would complain `Found conflicting attributes a#266 in the condition joining outer plan`.
No.
Add UT.
Closes #37074 from manuzhang/spark-39672.
Lead-authored-by: tianlzhang <ti...@ebay.com>
Co-authored-by: Manu Zhang <Ow...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 36fc73e7c42b84e05b15b2caecc0f804610dce20)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 16 +++++-
.../scala/org/apache/spark/sql/SubquerySuite.scala | 58 +++++++++++++++++++++-
2 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6afb6a5424b..78fb8b5de88 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -943,12 +943,24 @@ object ColumnPruning extends Rule[LogicalPlan] {
* order, otherwise lower Projects can be missed.
*/
private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp {
- case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child)))
+ case p1 @ Project(_, f @ Filter(e, p2 @ Project(_, child)))
if p2.outputSet.subsetOf(child.outputSet) &&
// We only remove attribute-only project.
- p2.projectList.forall(_.isInstanceOf[AttributeReference]) =>
+ p2.projectList.forall(_.isInstanceOf[AttributeReference]) &&
+ // We can't remove project when the child has conflicting attributes
+ // with the subquery in filter predicate
+ !hasConflictingAttrsWithSubquery(e, child) =>
p1.copy(child = f.copy(child = child))
}
+
+ private def hasConflictingAttrsWithSubquery(
+ predicate: Expression,
+ child: LogicalPlan): Boolean = {
+ predicate.find {
+ case s: SubqueryExpression if s.plan.outputSet.intersect(child.outputSet).nonEmpty => true
+ case _ => false
+ }.isDefined
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index fa24e8d175b..1ae5ae68d07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution}
import org.apache.spark.sql.execution.datasources.FileScanRDD
@@ -2220,4 +2220,60 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}
+
+ test("SPARK-39672: Fix removing project before filter with correlated subquery") {
+ withTempView("v1", "v2") {
+ Seq((1, 2, 3), (4, 5, 6)).toDF("a", "b", "c").createTempView("v1")
+ Seq((1, 3, 5), (4, 5, 6)).toDF("a", "b", "c").createTempView("v2")
+
+ def findProject(df: DataFrame): Seq[Project] = {
+ df.queryExecution.optimizedPlan.collect {
+ case p: Project => p
+ }
+ }
+
+ // project before filter cannot be removed since subquery has conflicting attributes
+ // with outer reference
+ val df1 = sql(
+ """
+ |select * from
+ |(
+ |select
+ |v1.a,
+ |v1.b,
+ |v2.c
+ |from v1
+ |inner join v2
+ |on v1.a=v2.a) t3
+ |where not exists (
+ | select 1
+ | from v2
+ | where t3.a=v2.a and t3.b=v2.b and t3.c=v2.c
+ |)
+ |""".stripMargin)
+ checkAnswer(df1, Row(1, 2, 5))
+ assert(findProject(df1).size == 4)
+
+ // project before filter can be removed when there are no conflicting attributes
+ val df2 = sql(
+ """
+ |select * from
+ |(
+ |select
+ |v1.b,
+ |v2.c
+ |from v1
+ |inner join v2
+ |on v1.b=v2.c) t3
+ |where not exists (
+ | select 1
+ | from v2
+ | where t3.b=v2.b and t3.c=v2.c
+ |)
+ |""".stripMargin)
+
+ checkAnswer(df2, Row(5, 5))
+ assert(findProject(df2).size == 3)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org