You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/01 17:24:20 UTC

[spark] branch master updated: [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a633f77  [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
a633f77 is described below

commit a633f77b5120d94eee6beff8615137bf537bbff9
Author: chenzhx <ch...@apache.org>
AuthorDate: Wed Mar 2 01:23:05 2022 +0800

    [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
    
    ### What changes were proposed in this pull request?
    
    When the join with duplicate view like
    ```
    SELECT l1.idFROM v1 l1
     INNER JOIN (
       SELECT id FROM v1
       GROUP BY id  HAVING COUNT(DISTINCT name) > 1
     ) l2
     ON l1.id = l2.id
     GROUP BY l1.name, l1.id;
    ```
    The error stack is:
    ```
    Resolved attribute(s) name#26 missing from id#31,name#32 in operator !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]. Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.;
    Aggregate [name#26, id#25], [id#25]
    +- Join Inner, (id#25 = id#31)
       :- SubqueryAlias l1
       :  +- SubqueryAlias spark_catalog.default.v1
       :     +- View (`default`.`v1`, [id#25,name#26])
       :        +- Project [cast(id#20 as int) AS id#25, cast(name#21 as string) AS name#26]
       :           +- Project [id#20, name#21]
       :              +- SubqueryAlias spark_catalog.default.t
       :                 +- Relation default.t[id#20,name#21] parquet
       +- SubqueryAlias l2
          +- Project [id#31]
             +- Filter (count(distinct name#26)#33L > cast(1 as bigint))
                +- !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]
                   +- SubqueryAlias spark_catalog.default.v1
                      +- View (`default`.`v1`, [id#31,name#32])
                         +- Project [cast(id#27 as int) AS id#31, cast(name#28 as string) AS name#32]
                            +- Project [id#27, name#28]
                               +- SubqueryAlias spark_catalog.default.t
                                  +- Relation default.t[id#27,name#28] parquet
    ```
    Spark will consider the two views to be duplicates, which will cause the query to fail.
    
    ### Why are the changes needed?
    
    Fix bug when using join in duplicate views.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. When we join with duplicate view, the query would be successful.
    DeduplicateRelations should only kick in if the plan's children are all resolved and valid.
    
    ### How was this patch tested?
    
    Add new UT
    
    Closes #35684 from chenzhx/SPARK-37932.
    
    Authored-by: chenzhx <ch...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/analysis/DeduplicateRelations.scala  |  7 ++++++-
 .../org/apache/spark/sql/execution/SQLViewSuite.scala | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 55b1c22..4c351e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -40,7 +40,12 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
 
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning(
+    val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
+    if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) {
+      // Wait for `ResolveMissingReferences` to resolve missing attributes first
+      return newPlan
+    }
+    newPlan.resolveOperatorsUpWithPruning(
       _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
       ruleId) {
       case p: LogicalPlan if !p.childrenResolved => p
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index ee6d352..9e6974a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -907,4 +907,23 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-37932: view join with same view") {
+    withTable("t") {
+      withView("v1") {
+        Seq((1, "test1"), (2, "test2"), (1, "test2")).toDF("id", "name")
+          .write.format("parquet").saveAsTable("t")
+        sql("CREATE VIEW v1 (id, name) AS SELECT id, name FROM t")
+
+        checkAnswer(
+          sql("""SELECT l1.id FROM v1 l1
+                |INNER JOIN (
+                |   SELECT id FROM v1
+                |   GROUP BY id HAVING COUNT(DISTINCT name) > 1
+                | ) l2 ON l1.id = l2.id GROUP BY l1.name, l1.id;
+                |""".stripMargin),
+          Seq(Row(1), Row(1)))
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org