You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/01 17:26:07 UTC

[spark] branch branch-3.2 updated: [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 482b71a  [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
482b71a is described below

commit 482b71aafbcbbe6e3de57ba6aab2371e0266e03e
Author: chenzhx <ch...@apache.org>
AuthorDate: Wed Mar 2 01:23:05 2022 +0800

    [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations
    
    When the join with duplicate view like
    ```
    SELECT l1.idFROM v1 l1
     INNER JOIN (
       SELECT id FROM v1
       GROUP BY id  HAVING COUNT(DISTINCT name) > 1
     ) l2
     ON l1.id = l2.id
     GROUP BY l1.name, l1.id;
    ```
    The error stack is:
    ```
    Resolved attribute(s) name#26 missing from id#31,name#32 in operator !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]. Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.;
    Aggregate [name#26, id#25], [id#25]
    +- Join Inner, (id#25 = id#31)
       :- SubqueryAlias l1
       :  +- SubqueryAlias spark_catalog.default.v1
       :     +- View (`default`.`v1`, [id#25,name#26])
       :        +- Project [cast(id#20 as int) AS id#25, cast(name#21 as string) AS name#26]
       :           +- Project [id#20, name#21]
       :              +- SubqueryAlias spark_catalog.default.t
       :                 +- Relation default.t[id#20,name#21] parquet
       +- SubqueryAlias l2
          +- Project [id#31]
             +- Filter (count(distinct name#26)#33L > cast(1 as bigint))
                +- !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]
                   +- SubqueryAlias spark_catalog.default.v1
                      +- View (`default`.`v1`, [id#31,name#32])
                         +- Project [cast(id#27 as int) AS id#31, cast(name#28 as string) AS name#32]
                            +- Project [id#27, name#28]
                               +- SubqueryAlias spark_catalog.default.t
                                  +- Relation default.t[id#27,name#28] parquet
    ```
    Spark will consider the two views to be duplicates, which will cause the query to fail.
    
    Fix bug when using join in duplicate views.
    
    Yes. When we join with duplicate view, the query would be successful.
    DeduplicateRelations should only kick in if the plan's children are all resolved and valid.
    
    Add new UT
    
    Closes #35684 from chenzhx/SPARK-37932.
    
    Authored-by: chenzhx <ch...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit a633f77b5120d94eee6beff8615137bf537bbff9)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/analysis/DeduplicateRelations.scala  |  7 ++++++-
 .../org/apache/spark/sql/execution/SQLViewSuite.scala | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index b877c24..da8ae21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -40,7 +40,12 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
 
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning(
+    val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
+    if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) {
+      // Wait for `ResolveMissingReferences` to resolve missing attributes first
+      return newPlan
+    }
+    newPlan.resolveOperatorsUpWithPruning(
       _.containsAnyPattern(JOIN, LATERAL_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) {
       case p: LogicalPlan if !p.childrenResolved => p
       // To resolve duplicate expression IDs for Join.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index b581287..cf699d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -910,4 +910,23 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-37932: view join with same view") {
+    withTable("t") {
+      withView("v1") {
+        Seq((1, "test1"), (2, "test2"), (1, "test2")).toDF("id", "name")
+          .write.format("parquet").saveAsTable("t")
+        sql("CREATE VIEW v1 (id, name) AS SELECT id, name FROM t")
+
+        checkAnswer(
+          sql("""SELECT l1.id FROM v1 l1
+                |INNER JOIN (
+                |   SELECT id FROM v1
+                |   GROUP BY id HAVING COUNT(DISTINCT name) > 1
+                | ) l2 ON l1.id = l2.id GROUP BY l1.name, l1.id;
+                |""".stripMargin),
+          Seq(Row(1), Row(1)))
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org