You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/01 17:24:20 UTC
[spark] branch master updated: [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a633f77 [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
a633f77 is described below
commit a633f77b5120d94eee6beff8615137bf537bbff9
Author: chenzhx <ch...@apache.org>
AuthorDate: Wed Mar 2 01:23:05 2022 +0800
[SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
### What changes were proposed in this pull request?
When the join with duplicate view like
```
SELECT l1.idFROM v1 l1
INNER JOIN (
SELECT id FROM v1
GROUP BY id HAVING COUNT(DISTINCT name) > 1
) l2
ON l1.id = l2.id
GROUP BY l1.name, l1.id;
```
The error stack is:
```
Resolved attribute(s) name#26 missing from id#31,name#32 in operator !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]. Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.;
Aggregate [name#26, id#25], [id#25]
+- Join Inner, (id#25 = id#31)
:- SubqueryAlias l1
: +- SubqueryAlias spark_catalog.default.v1
: +- View (`default`.`v1`, [id#25,name#26])
: +- Project [cast(id#20 as int) AS id#25, cast(name#21 as string) AS name#26]
: +- Project [id#20, name#21]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation default.t[id#20,name#21] parquet
+- SubqueryAlias l2
+- Project [id#31]
+- Filter (count(distinct name#26)#33L > cast(1 as bigint))
+- !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]
+- SubqueryAlias spark_catalog.default.v1
+- View (`default`.`v1`, [id#31,name#32])
+- Project [cast(id#27 as int) AS id#31, cast(name#28 as string) AS name#32]
+- Project [id#27, name#28]
+- SubqueryAlias spark_catalog.default.t
+- Relation default.t[id#27,name#28] parquet
```
Spark will consider the two views to be duplicates, which will cause the query to fail.
### Why are the changes needed?
Fix bug when using join in duplicate views.
### Does this PR introduce _any_ user-facing change?
Yes. When we join with duplicate view, the query would be successful.
DeduplicateRelations should only kick in if the plan's children are all resolved and valid.
### How was this patch tested?
Add new UT
Closes #35684 from chenzhx/SPARK-37932.
Authored-by: chenzhx <ch...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/analysis/DeduplicateRelations.scala | 7 ++++++-
.../org/apache/spark/sql/execution/SQLViewSuite.scala | 19 +++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 55b1c22..4c351e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -40,7 +40,12 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
object DeduplicateRelations extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning(
+ val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
+ if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) {
+ // Wait for `ResolveMissingReferences` to resolve missing attributes first
+ return newPlan
+ }
+ newPlan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
ruleId) {
case p: LogicalPlan if !p.childrenResolved => p
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index ee6d352..9e6974a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -907,4 +907,23 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("SPARK-37932: view join with same view") {
+ withTable("t") {
+ withView("v1") {
+ Seq((1, "test1"), (2, "test2"), (1, "test2")).toDF("id", "name")
+ .write.format("parquet").saveAsTable("t")
+ sql("CREATE VIEW v1 (id, name) AS SELECT id, name FROM t")
+
+ checkAnswer(
+ sql("""SELECT l1.id FROM v1 l1
+ |INNER JOIN (
+ | SELECT id FROM v1
+ | GROUP BY id HAVING COUNT(DISTINCT name) > 1
+ | ) l2 ON l1.id = l2.id GROUP BY l1.name, l1.id;
+ |""".stripMargin),
+ Seq(Row(1), Row(1)))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org