You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/09/01 14:04:36 UTC
[spark] branch master updated: [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f4ff2d16483 [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved
f4ff2d16483 is described below
commit f4ff2d16483f7da2c7ab73c7cfec75bb9e91064d
Author: Maryann Xue <ma...@gmail.com>
AuthorDate: Thu Sep 1 22:03:58 2022 +0800
[SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved
### What changes were proposed in this pull request?
This PR fixes a bug where a CTE reference cannot be resolved if this reference occurs in an inner CTE definition nested in the outer CTE's main body FROM clause. E.g.,
```
WITH cte_outer AS (
SELECT 1
)
SELECT * FROM (
WITH cte_inner AS (
SELECT * FROM cte_outer
)
SELECT * FROM cte_inner
)
```
This fix is to change the `CTESubstitution`'s traverse order from `resolveOperatorsUpWithPruning` to `resolveOperatorsDownWithPruning` and also to recursively call `traverseAndSubstituteCTE` for CTE main body.
### Why are the changes needed?
Bug fix. Without the fix an `AnalysisException` would be thrown for CTE queries mentioned above.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added UTs.
Closes #37751 from maryannxue/spark-40297.
Authored-by: Maryann Xue <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/analysis/CTESubstitution.scala | 30 ++--
.../test/resources/sql-tests/inputs/cte-nested.sql | 59 +++++++-
.../resources/sql-tests/results/cte-legacy.sql.out | 80 +++++++++++
.../resources/sql-tests/results/cte-nested.sql.out | 79 ++++++++++
.../sql-tests/results/cte-nonlegacy.sql.out | 79 ++++++++++
.../org/apache/spark/sql/CTEInlineSuite.scala | 160 ++++++++++++++++++++-
6 files changed, 476 insertions(+), 11 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 62ebfa83431..6a4562450b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -56,7 +56,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
case _ => false
}
val cteDefs = ArrayBuffer.empty[CTERelationDef]
- val (substituted, lastSubstituted) =
+ val (substituted, firstSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
@@ -68,12 +68,17 @@ object CTESubstitution extends Rule[LogicalPlan] {
}
if (cteDefs.isEmpty) {
substituted
- } else if (substituted eq lastSubstituted.get) {
+ } else if (substituted eq firstSubstituted.get) {
WithCTE(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
- case p if p eq lastSubstituted.get =>
+ case p if p eq firstSubstituted.get =>
+ // `firstSubstituted` is the parent of all other CTEs (if any).
+ done = true
+ WithCTE(p, cteDefs.toSeq)
+ case p if p.children.count(_.containsPattern(CTE)) > 1 =>
+ // This is the first common parent of all CTEs.
done = true
WithCTE(p, cteDefs.toSeq)
}
@@ -181,21 +186,28 @@ object CTESubstitution extends Rule[LogicalPlan] {
isCommand: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
- var lastSubstituted: Option[LogicalPlan] = None
- val newPlan = plan.resolveOperatorsUpWithPruning(
+ var firstSubstituted: Option[LogicalPlan] = None
+ val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
- resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs)
- lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations))
- lastSubstituted.get
+ resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++
+ outerCTEDefs
+ val substituted = substituteCTE(
+ traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1,
+ isCommand,
+ resolvedCTERelations)
+ if (firstSubstituted.isEmpty) {
+ firstSubstituted = Some(substituted)
+ }
+ substituted
case other =>
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case e: SubqueryExpression => e.withNewPlan(apply(e.plan))
}
}
- (newPlan, lastSubstituted)
+ (newPlan, firstSubstituted)
}
private def resolveCTERelations(
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
index b5d7fa5687b..5f12388b9cb 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql
@@ -146,4 +146,61 @@ WITH
)
SELECT * FROM t3
)
-SELECT * FROM t2;
\ No newline at end of file
+SELECT * FROM t2;
+
+-- CTE nested in CTE main body FROM clause references outer CTE def
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner
+);
+
+-- CTE double nested in CTE main body FROM clause references outer CTE def
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM (
+ WITH cte_inner_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner_inner
+ )
+ )
+ SELECT * FROM cte_inner
+);
+
+-- Invalid reference to invisible CTE def nested CTE def
+WITH cte_outer AS (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+);
+
+-- Invalid reference to invisible CTE def nested CTE def (in FROM)
+WITH cte_outer AS (
+ SELECT * FROM (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+ )
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+);
\ No newline at end of file
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
index 34f11d9da53..b72da5e739e 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out
@@ -233,3 +233,83 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Table or view not found: t1; line 5 pos 20
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM (
+ WITH cte_inner_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner_inner
+ )
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_outer; line 8 pos 22
+
+
+-- !query
+WITH cte_outer AS (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 9 pos 18
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT * FROM (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+ )
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 11 pos 18
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
index 13b4d10304e..e6382d74309 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
@@ -240,3 +240,82 @@ SELECT * FROM t2
struct<1:int>
-- !query output
1
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM (
+ WITH cte_inner_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner_inner
+ )
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 9 pos 18
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT * FROM (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+ )
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 11 pos 18
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
index 7da691363fd..26665fd33e8 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out
@@ -232,3 +232,82 @@ SELECT * FROM t2
struct<1:int>
-- !query output
1
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT 1
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM (
+ WITH cte_inner_inner AS (
+ SELECT * FROM cte_outer
+ )
+ SELECT * FROM cte_inner_inner
+ )
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+
+
+-- !query
+WITH cte_outer AS (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 9 pos 18
+
+
+-- !query
+WITH cte_outer AS (
+ SELECT * FROM (
+ WITH cte_invisible_inner AS (
+ SELECT 1
+ )
+ SELECT * FROM cte_invisible_inner
+ )
+)
+SELECT * FROM (
+ WITH cte_inner AS (
+ SELECT * FROM cte_invisible_inner
+ )
+ SELECT * FROM cte_inner
+)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+Table or view not found: cte_invisible_inner; line 11 pos 18
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 26d165b460a..7801c8e644d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, Literal, Or}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RebalancePartitions, RepartitionByExpression, RepartitionOperation, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.internal.SQLConf
@@ -486,6 +486,164 @@ abstract class CTEInlineSuiteBase
}
}
}
+
+ test("Make sure CTESubstitution places WithCTE back in the plan correctly.") {
+ withView("t") {
+ Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t")
+
+ // CTE on both sides of join - WithCTE placed over first common parent, i.e., the join.
+ val df1 = sql(
+ s"""
+ |select count(v1.c3), count(v2.c3) from (
+ | with
+ | v1 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v1
+ |) v1 join (
+ | with
+ | v2 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v2
+ |) v2 on v1.c1 = v2.c1
+ """.stripMargin)
+ checkAnswer(df1, Row(2, 2) :: Nil)
+ df1.queryExecution.analyzed match {
+ case Aggregate(_, _, WithCTE(_, cteDefs)) => assert(cteDefs.length == 2)
+ case other => fail(s"Expect pattern Aggregate(WithCTE(_)) but got $other")
+ }
+
+ // CTE on one side of join - WithCTE placed back where it was.
+ val df2 = sql(
+ s"""
+ |select count(v1.c3), count(v2.c3) from (
+ | select c1, c2, rand() c3 from t
+ |) v1 join (
+ | with
+ | v2 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v2
+ |) v2 on v1.c1 = v2.c1
+ """.stripMargin)
+ checkAnswer(df2, Row(2, 2) :: Nil)
+ df2.queryExecution.analyzed match {
+ case Aggregate(_, _, Join(_, SubqueryAlias(_, WithCTE(_, cteDefs)), _, _, _)) =>
+ assert(cteDefs.length == 1)
+ case other => fail(s"Expect pattern Aggregate(Join(_, WithCTE(_))) but got $other")
+ }
+
+ // CTE on one side of join and both sides of union - WithCTE placed on first common parent.
+ val df3 = sql(
+ s"""
+ |select count(v1.c3), count(v2.c3) from (
+ | select c1, c2, rand() c3 from t
+ |) v1 join (
+ | select * from (
+ | with
+ | v1 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v1
+ | )
+ | union all
+ | select * from (
+ | with
+ | v2 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v2
+ | )
+ |) v2 on v1.c1 = v2.c1
+ """.stripMargin)
+ checkAnswer(df3, Row(4, 4) :: Nil)
+ df3.queryExecution.analyzed match {
+ case Aggregate(_, _, Join(_, SubqueryAlias(_, WithCTE(_: Union, cteDefs)), _, _, _)) =>
+ assert(cteDefs.length == 2)
+ case other => fail(
+ s"Expect pattern Aggregate(Join(_, (WithCTE(Union(_, _))))) but got $other")
+ }
+
+ // CTE on one side of join and one side of union - WithCTE placed back where it was.
+ val df4 = sql(
+ s"""
+ |select count(v1.c3), count(v2.c3) from (
+ | select c1, c2, rand() c3 from t
+ |) v1 join (
+ | select * from (
+ | with
+ | v1 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v1
+ | )
+ | union all
+ | select c1, c2, rand() c3 from t
+ |) v2 on v1.c1 = v2.c1
+ """.stripMargin)
+ checkAnswer(df4, Row(4, 4) :: Nil)
+ df4.queryExecution.analyzed match {
+ case Aggregate(_, _, Join(_, SubqueryAlias(_, Union(children, _, _)), _, _, _))
+ if children.head.find(_.isInstanceOf[WithCTE]).isDefined =>
+ assert(
+ children.head.collect {
+ case w: WithCTE => w
+ }.head.cteDefs.length == 1)
+ case other => fail(
+ s"Expect pattern Aggregate(Join(_, (WithCTE(Union(_, _))))) but got $other")
+ }
+
+ // CTE on both sides of join and one side of union - WithCTE placed on first common parent.
+ val df5 = sql(
+ s"""
+ |select count(v1.c3), count(v2.c3) from (
+ | with
+ | v1 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v1
+ |) v1 join (
+ | select c1, c2, rand() c3 from t
+ | union all
+ | select * from (
+ | with
+ | v2 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v2
+ | )
+ |) v2 on v1.c1 = v2.c1
+ """.stripMargin)
+ checkAnswer(df5, Row(4, 4) :: Nil)
+ df5.queryExecution.analyzed match {
+ case Aggregate(_, _, WithCTE(_, cteDefs)) => assert(cteDefs.length == 2)
+ case other => fail(s"Expect pattern Aggregate(WithCTE(_)) but got $other")
+ }
+
+ // CTE as root node - WithCTE placed back where it was.
+ val df6 = sql(
+ s"""
+ |with
+ |v1 as (
+ | select c1, c2, rand() c3 from t
+ |)
+ |select count(v1.c3), count(v2.c3) from
+ |v1 join (
+ | with
+ | v2 as (
+ | select c1, c2, rand() c3 from t
+ | )
+ | select * from v2
+ |) v2 on v1.c1 = v2.c1
+ """.stripMargin)
+ checkAnswer(df6, Row(2, 2) :: Nil)
+ df6.queryExecution.analyzed match {
+ case WithCTE(_, cteDefs) => assert(cteDefs.length == 2)
+ case other => fail(s"Expect pattern WithCTE(_) but got $other")
+ }
+ }
+ }
}
class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org