You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/09/01 14:04:36 UTC

[spark] branch master updated: [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ff2d16483 [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved
f4ff2d16483 is described below

commit f4ff2d16483f7da2c7ab73c7cfec75bb9e91064d
Author: Maryann Xue <ma...@gmail.com>
AuthorDate: Thu Sep 1 22:03:58 2022 +0800

    [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a bug where a CTE reference cannot be resolved if this reference occurs in an inner CTE definition nested in the outer CTE's main body FROM clause. E.g.,
    ```
    WITH cte_outer AS (
      SELECT 1
    )
    SELECT * FROM (
      WITH cte_inner AS (
        SELECT * FROM cte_outer
      )
      SELECT * FROM cte_inner
    )
    ```
    
    This fix is to change the `CTESubstitution`'s traverse order from `resolveOperatorsUpWithPruning` to `resolveOperatorsDownWithPruning` and also to recursively call `traverseAndSubstituteCTE` for CTE main body.
    
    ### Why are the changes needed?
    
    Bug fix. Without the fix an `AnalysisException` would be thrown for CTE queries mentioned above.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UTs.
    
    Closes #37751 from maryannxue/spark-40297.
    
    Authored-by: Maryann Xue <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/analysis/CTESubstitution.scala    |  30 ++--
 .../test/resources/sql-tests/inputs/cte-nested.sql |  59 +++++++-
 .../resources/sql-tests/results/cte-legacy.sql.out |  80 +++++++++++
 .../resources/sql-tests/results/cte-nested.sql.out |  79 ++++++++++
 .../sql-tests/results/cte-nonlegacy.sql.out        |  79 ++++++++++
 .../org/apache/spark/sql/CTEInlineSuite.scala      | 160 ++++++++++++++++++++-
 6 files changed, 476 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 62ebfa83431..6a4562450b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -56,7 +56,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
       case _ => false
     }
     val cteDefs = ArrayBuffer.empty[CTERelationDef]
-    val (substituted, lastSubstituted) =
+    val (substituted, firstSubstituted) =
       LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
         case LegacyBehaviorPolicy.EXCEPTION =>
           assertNoNameConflictsInCTE(plan)
@@ -68,12 +68,17 @@ object CTESubstitution extends Rule[LogicalPlan] {
     }
     if (cteDefs.isEmpty) {
       substituted
-    } else if (substituted eq lastSubstituted.get) {
+    } else if (substituted eq firstSubstituted.get) {
       WithCTE(substituted, cteDefs.toSeq)
     } else {
       var done = false
       substituted.resolveOperatorsWithPruning(_ => !done) {
-        case p if p eq lastSubstituted.get =>
+        case p if p eq firstSubstituted.get =>
+          // `firstSubstituted` is the parent of all other CTEs (if any).
+          done = true
+          WithCTE(p, cteDefs.toSeq)
+        case p if p.children.count(_.containsPattern(CTE)) > 1 =>
+          // This is the first common parent of all CTEs.
           done = true
           WithCTE(p, cteDefs.toSeq)
       }
@@ -181,21 +186,28 @@ object CTESubstitution extends Rule[LogicalPlan] {
       isCommand: Boolean,
       outerCTEDefs: Seq[(String, CTERelationDef)],
       cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
-    var lastSubstituted: Option[LogicalPlan] = None
-    val newPlan = plan.resolveOperatorsUpWithPruning(
+    var firstSubstituted: Option[LogicalPlan] = None
+    val newPlan = plan.resolveOperatorsDownWithPruning(
         _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
       case UnresolvedWith(child: LogicalPlan, relations) =>
         val resolvedCTERelations =
-          resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs)
-        lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations))
-        lastSubstituted.get
+          resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++
+            outerCTEDefs
+        val substituted = substituteCTE(
+          traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1,
+          isCommand,
+          resolvedCTERelations)
+        if (firstSubstituted.isEmpty) {
+          firstSubstituted = Some(substituted)
+        }
+        substituted
 
       case other =>
         other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
           case e: SubqueryExpression => e.withNewPlan(apply(e.plan))
         }
     }
-    (newPlan, lastSubstituted)
+    (newPlan, firstSubstituted)
   }
 
   private def resolveCTERelations(
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
index b5d7fa5687b..5f12388b9cb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
@@ -146,4 +146,61 @@ WITH
     )
     SELECT * FROM t3
   )
-SELECT * FROM t2;
\ No newline at end of file
+SELECT * FROM t2;
+
+-- CTE nested in CTE main body FROM clause references outer CTE def
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_outer
+  )
+  SELECT * FROM cte_inner
+);
+
+-- CTE double nested in CTE main body FROM clause references outer CTE def
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM (
+      WITH cte_inner_inner AS (
+        SELECT * FROM cte_outer
+      )
+      SELECT * FROM cte_inner_inner
+    )
+  )
+  SELECT * FROM cte_inner
+);
+
+-- Invalid reference to invisible CTE def nested CTE def
+WITH cte_outer AS (
+  WITH cte_invisible_inner AS (
+    SELECT 1
+  )
+  SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+);
+
+-- Invalid reference to invisible CTE def nested CTE def (in FROM)
+WITH cte_outer AS (
+  SELECT * FROM (
+    WITH cte_invisible_inner AS (
+      SELECT 1
+    )
+    SELECT * FROM cte_invisible_inner
+  )
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+);
\ No newline at end of file
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
index 34f11d9da53..b72da5e739e 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
@@ -233,3 +233,83 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 Table or view not found: t1; line 5 pos 20
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_outer
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM (
+      WITH cte_inner_inner AS (
+        SELECT * FROM cte_outer
+      )
+      SELECT * FROM cte_inner_inner
+    )
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_outer; line 8 pos 22
+
+
+-- !query
+WITH cte_outer AS (
+  WITH cte_invisible_inner AS (
+    SELECT 1
+  )
+  SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 9 pos 18
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT * FROM (
+    WITH cte_invisible_inner AS (
+      SELECT 1
+    )
+    SELECT * FROM cte_invisible_inner
+  )
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 11 pos 18
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
index 13b4d10304e..e6382d74309 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
@@ -240,3 +240,82 @@ SELECT * FROM t2
 struct<1:int>
 -- !query output
 1
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_outer
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM (
+      WITH cte_inner_inner AS (
+        SELECT * FROM cte_outer
+      )
+      SELECT * FROM cte_inner_inner
+    )
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+  WITH cte_invisible_inner AS (
+    SELECT 1
+  )
+  SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 9 pos 18
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT * FROM (
+    WITH cte_invisible_inner AS (
+      SELECT 1
+    )
+    SELECT * FROM cte_invisible_inner
+  )
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 11 pos 18
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
index 7da691363fd..26665fd33e8 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
@@ -232,3 +232,82 @@ SELECT * FROM t2
 struct<1:int>
 -- !query output
 1
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_outer
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT 1
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM (
+      WITH cte_inner_inner AS (
+        SELECT * FROM cte_outer
+      )
+      SELECT * FROM cte_inner_inner
+    )
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+  WITH cte_invisible_inner AS (
+    SELECT 1
+  )
+  SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 9 pos 18
+
+
+-- !query
+WITH cte_outer AS (
+  SELECT * FROM (
+    WITH cte_invisible_inner AS (
+      SELECT 1
+    )
+    SELECT * FROM cte_invisible_inner
+  )
+)
+SELECT * FROM (
+  WITH cte_inner AS (
+    SELECT * FROM cte_invisible_inner
+  )
+  SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 11 pos 18
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 26d165b460a..7801c8e644d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, Literal, Or}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RebalancePartitions, RepartitionByExpression, RepartitionOperation, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.internal.SQLConf
@@ -486,6 +486,164 @@ abstract class CTEInlineSuiteBase
       }
     }
   }
+
+  test("Make sure CTESubstitution places WithCTE back in the plan correctly.") {
+    withView("t") {
+      Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t")
+
+      // CTE on both sides of join - WithCTE placed over first common parent, i.e., the join.
+      val df1 = sql(
+        s"""
+           |select count(v1.c3), count(v2.c3) from (
+           |  with
+           |  v1 as (
+           |    select c1, c2, rand() c3 from t
+           |  )
+           |  select * from v1
+           |) v1 join (
+           |  with
+           |  v2 as (
+           |    select c1, c2, rand() c3 from t
+           |  )
+           |  select * from v2
+           |) v2 on v1.c1 = v2.c1
+         """.stripMargin)
+      checkAnswer(df1, Row(2, 2) :: Nil)
+      df1.queryExecution.analyzed match {
+        case Aggregate(_, _, WithCTE(_, cteDefs)) => assert(cteDefs.length == 2)
+        case other => fail(s"Expect pattern Aggregate(WithCTE(_)) but got $other")
+      }
+
+      // CTE on one side of join - WithCTE placed back where it was.
+      val df2 = sql(
+        s"""
+           |select count(v1.c3), count(v2.c3) from (
+           |  select c1, c2, rand() c3 from t
+           |) v1 join (
+           |  with
+           |  v2 as (
+           |    select c1, c2, rand() c3 from t
+           |  )
+           |  select * from v2
+           |) v2 on v1.c1 = v2.c1
+         """.stripMargin)
+      checkAnswer(df2, Row(2, 2) :: Nil)
+      df2.queryExecution.analyzed match {
+        case Aggregate(_, _, Join(_, SubqueryAlias(_, WithCTE(_, cteDefs)), _, _, _)) =>
+          assert(cteDefs.length == 1)
+        case other => fail(s"Expect pattern Aggregate(Join(_, WithCTE(_))) but got $other")
+      }
+
+      // CTE on one side of join and both sides of union - WithCTE placed on first common parent.
+      val df3 = sql(
+        s"""
+           |select count(v1.c3), count(v2.c3) from (
+           |  select c1, c2, rand() c3 from t
+           |) v1 join (
+           |  select * from (
+           |    with
+           |    v1 as (
+           |      select c1, c2, rand() c3 from t
+           |    )
+           |    select * from v1
+           |  )
+           |  union all
+           |  select * from (
+           |    with
+           |    v2 as (
+           |      select c1, c2, rand() c3 from t
+           |    )
+           |    select * from v2
+           |  )
+           |) v2 on v1.c1 = v2.c1
+         """.stripMargin)
+      checkAnswer(df3, Row(4, 4) :: Nil)
+      df3.queryExecution.analyzed match {
+        case Aggregate(_, _, Join(_, SubqueryAlias(_, WithCTE(_: Union, cteDefs)), _, _, _)) =>
+          assert(cteDefs.length == 2)
+        case other => fail(
+          s"Expect pattern Aggregate(Join(_, (WithCTE(Union(_, _))))) but got $other")
+      }
+
+      // CTE on one side of join and one side of union - WithCTE placed back where it was.
+      val df4 = sql(
+        s"""
+           |select count(v1.c3), count(v2.c3) from (
+           |  select c1, c2, rand() c3 from t
+           |) v1 join (
+           |  select * from (
+           |    with
+           |    v1 as (
+           |      select c1, c2, rand() c3 from t
+           |    )
+           |    select * from v1
+           |  )
+           |  union all
+           |  select c1, c2, rand() c3 from t
+           |) v2 on v1.c1 = v2.c1
+         """.stripMargin)
+      checkAnswer(df4, Row(4, 4) :: Nil)
+      df4.queryExecution.analyzed match {
+        case Aggregate(_, _, Join(_, SubqueryAlias(_, Union(children, _, _)), _, _, _))
+          if children.head.find(_.isInstanceOf[WithCTE]).isDefined =>
+          assert(
+            children.head.collect {
+              case w: WithCTE => w
+            }.head.cteDefs.length == 1)
+        case other => fail(
+          s"Expect pattern Aggregate(Join(_, (WithCTE(Union(_, _))))) but got $other")
+      }
+
+      // CTE on both sides of join and one side of union - WithCTE placed on first common parent.
+      val df5 = sql(
+        s"""
+           |select count(v1.c3), count(v2.c3) from (
+           |  with
+           |  v1 as (
+           |    select c1, c2, rand() c3 from t
+           |  )
+           |  select * from v1
+           |) v1 join (
+           |  select c1, c2, rand() c3 from t
+           |  union all
+           |  select * from (
+           |    with
+           |    v2 as (
+           |      select c1, c2, rand() c3 from t
+           |    )
+           |    select * from v2
+           |  )
+           |) v2 on v1.c1 = v2.c1
+         """.stripMargin)
+      checkAnswer(df5, Row(4, 4) :: Nil)
+      df5.queryExecution.analyzed match {
+        case Aggregate(_, _, WithCTE(_, cteDefs)) => assert(cteDefs.length == 2)
+        case other => fail(s"Expect pattern Aggregate(WithCTE(_)) but got $other")
+      }
+
+      // CTE as root node - WithCTE placed back where it was.
+      val df6 = sql(
+        s"""
+           |with
+           |v1 as (
+           |  select c1, c2, rand() c3 from t
+           |)
+           |select count(v1.c3), count(v2.c3) from
+           |v1 join (
+           |  with
+           |  v2 as (
+           |    select c1, c2, rand() c3 from t
+           |  )
+           |  select * from v2
+           |) v2 on v1.c1 = v2.c1
+         """.stripMargin)
+      checkAnswer(df6, Row(2, 2) :: Nil)
+      df6.queryExecution.analyzed match {
+        case WithCTE(_, cteDefs) => assert(cteDefs.length == 2)
+        case other => fail(s"Expect pattern WithCTE(_) but got $other")
+      }
+    }
+  }
 }
 
 class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org