You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/08/12 17:07:40 UTC

spark git commit: [SPARK-16771][SQL] WITH clause should not fall into infinite loop.

Repository: spark
Updated Branches:
  refs/heads/master bbae20ade -> 2a105134e


[SPARK-16771][SQL] WITH clause should not fall into infinite loop.

## What changes were proposed in this pull request?

This PR changes the CTE resolving rule to use only **forward-declared** tables in order to prevent infinite loops. More specifically, new logic is like the following.

* Resolve CTEs in `WITH` clauses first before replacing the main SQL body.
* When resolving CTEs, only forward-declared CTEs or base tables are referenced.
  - Self-referencing is not allowed any more.
  - Cross-referencing is not allowed any more.

**Reported Error Scenarios**
```scala
scala> sql("WITH t AS (SELECT 1 FROM t) SELECT * FROM t")
java.lang.StackOverflowError
...
scala> sql("WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2")
java.lang.StackOverflowError
...
```
Note that `t`, `t1`, and `t2` are not declared in database. Spark falls into infinite loops before resolving table names.

## How was this patch tested?

Pass the Jenkins tests with new two testcases.

Author: Dongjoon Hyun <do...@apache.org>

Closes #14397 from dongjoon-hyun/SPARK-16771-TREENODE.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a105134
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a105134
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a105134

Branch: refs/heads/master
Commit: 2a105134e9a3efd46b761fab5e563ddebb26575d
Parents: bbae20a
Author: Dongjoon Hyun <do...@apache.org>
Authored: Fri Aug 12 19:07:34 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Aug 12 19:07:34 2016 +0200

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 24 ++++-----
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  2 +-
 .../plans/logical/basicLogicalOperators.scala   |  7 ++-
 .../sql/catalyst/parser/PlanParserSuite.scala   |  2 +-
 .../src/test/resources/sql-tests/inputs/cte.sql | 14 +++++
 .../resources/sql-tests/results/cte.sql.out     | 57 ++++++++++++++++++++
 6 files changed, 88 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a105134/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 14a2a32..a2e276e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -125,22 +125,22 @@ class Analyzer(
   object CTESubstitution extends Rule[LogicalPlan] {
     // TODO allow subquery to define CTE
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators  {
-      case With(child, relations) => substituteCTE(child, relations)
+      case With(child, relations) =>
+        substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) {
+          case (resolved, (name, relation)) =>
+            resolved :+ name -> ResolveRelations(substituteCTE(relation, resolved))
+        })
       case other => other
     }
 
-    def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
-      plan transform {
-        // In hive, if there is same table name in database and CTE definition,
-        // hive will use the table in database, not the CTE one.
-        // Taking into account the reasonableness and the implementation complexity,
-        // here use the CTE definition first, check table name only and ignore database name
-        // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
+    def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
+      plan transformDown {
         case u : UnresolvedRelation =>
-          val substituted = cteRelations.get(u.tableIdentifier.table).map { relation =>
-            val withAlias = u.alias.map(SubqueryAlias(_, relation))
-            withAlias.getOrElse(relation)
-          }
+          val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
+            .map(_._2).map { relation =>
+              val withAlias = u.alias.map(SubqueryAlias(_, relation))
+              withAlias.getOrElse(relation)
+            }
           substituted.getOrElse(u)
         case other =>
           // This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.

http://git-wip-us.apache.org/repos/asf/spark/blob/2a105134/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c7fdc28..25c8445 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -97,7 +97,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
       }
       // Check for duplicate names.
       checkDuplicateKeys(ctes, ctx)
-      With(query, ctes.toMap)
+      With(query, ctes)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a105134/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index eb612c4..2917d8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -392,11 +392,10 @@ case class InsertIntoTable(
  * This operator will be removed during analysis and the relations will be substituted into child.
  *
  * @param child The final query of this CTE.
- * @param cteRelations Queries that this CTE defined,
- *                     key is the alias of the CTE definition,
- *                     value is the CTE definition.
+ * @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined
+ *                     Each CTE can see the base tables and the previously defined CTEs only.
  */
-case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode {
+case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a105134/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 00a37cf..34d52c7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -81,7 +81,7 @@ class PlanParserSuite extends PlanTest {
       val ctes = namedPlans.map {
         case (name, cte) =>
           name -> SubqueryAlias(name, cte)
-      }.toMap
+      }
       With(plan, ctes)
     }
     assertEqual(

http://git-wip-us.apache.org/repos/asf/spark/blob/2a105134/sql/core/src/test/resources/sql-tests/inputs/cte.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
new file mode 100644
index 0000000..10d34de
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
@@ -0,0 +1,14 @@
+create temporary view t as select * from values 0, 1, 2 as t(id);
+create temporary view t2 as select * from values 0, 1 as t(id);
+
+-- WITH clause should not fall into infinite loop by referencing self
+WITH s AS (SELECT 1 FROM s) SELECT * FROM s;
+
+-- WITH clause should reference the base table
+WITH t AS (SELECT 1 FROM t) SELECT * FROM t;
+
+-- WITH clause should not allow cross reference
+WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
+
+-- WITH clause should reference the previous CTE
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2;

http://git-wip-us.apache.org/repos/asf/spark/blob/2a105134/sql/core/src/test/resources/sql-tests/results/cte.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
new file mode 100644
index 0000000..ddee5bf
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
@@ -0,0 +1,57 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 6
+
+
+-- !query 0
+create temporary view t as select * from values 0, 1, 2 as t(id)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+create temporary view t2 as select * from values 0, 1 as t(id)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+WITH s AS (SELECT 1 FROM s) SELECT * FROM s
+-- !query 2 schema
+struct<>
+-- !query 2 output
+org.apache.spark.sql.AnalysisException
+Table or view not found: s; line 1 pos 25
+
+
+-- !query 3
+WITH t AS (SELECT 1 FROM t) SELECT * FROM t
+-- !query 3 schema
+struct<1:int>
+-- !query 3 output
+1
+1
+1
+
+
+-- !query 4
+WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2
+-- !query 4 schema
+struct<>
+-- !query 4 output
+org.apache.spark.sql.AnalysisException
+Table or view not found: s2; line 1 pos 26
+
+
+-- !query 5
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2
+-- !query 5 schema
+struct<id:int,2:int>
+-- !query 5 output
+0	2
+0	2
+1	2
+1	2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org