You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/12 03:30:21 UTC

spark git commit: [SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext

Repository: spark
Updated Branches:
  refs/heads/master 7dbd37160 -> 2f5358873


[SPARK-6199] [SQL] Support CTE in HiveContext and SQLContext

Author: haiyang <hu...@huawei.com>

Closes #4929 from haiyangsea/cte and squashes the following commits:

220b67d [haiyang] add golden files for cte test
d3c7681 [haiyang] Merge branch 'master' into cte-repair
0ba2070 [haiyang] modify code style
9ce6b58 [haiyang] fix conflict
ff74741 [haiyang] add comment for With plan
0d56af4 [haiyang] code indention
776a440 [haiyang] add comments for resolve relation strategy
2fccd7e [haiyang] add comments for resolve relation strategy
241bbe2 [haiyang] fix cte problem of view
e9e1237 [haiyang] fix test case problem
614182f [haiyang] add test cases for CTE feature
32e415b [haiyang] add comment
1cc8c15 [haiyang] support with
03f1097 [haiyang] support with
e960099 [haiyang] support with
9aaa874 [haiyang] support with
0566978 [haiyang] support with
a99ecd2 [haiyang] support with
c3fa4c2 [haiyang] support with
3b6077f [haiyang] support with
5f8abe3 [haiyang] support with
4572b05 [haiyang] support with
f801f54 [haiyang] support with


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f535887
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f535887
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f535887

Branch: refs/heads/master
Commit: 2f53588738e95a2191f9844818e47f0d2ebbfd54
Parents: 7dbd371
Author: haiyang <hu...@huawei.com>
Authored: Sat Apr 11 18:30:17 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sat Apr 11 18:30:17 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |  7 +++++
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 31 +++++++++++++++-----
 .../catalyst/plans/logical/basicOperators.scala | 12 ++++++++
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 14 +++++++++
 .../org/apache/spark/sql/hive/HiveQl.scala      | 27 +++++++++++++----
 ...eature #1-0-eedabbfe6ba8799f7b7782fb47a82768 |  3 ++
 ...eature #2-0-aa03d104251f97e36bc52279cb9931c9 |  4 +++
 ...eature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 |  1 +
 .../sql/hive/execution/HiveQuerySuite.scala     | 15 ++++++++++
 9 files changed, 100 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 89f4a19..ee04cb5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
   protected val UPPER = Keyword("UPPER")
   protected val WHEN = Keyword("WHEN")
   protected val WHERE = Keyword("WHERE")
+  protected val WITH = Keyword("WITH")
 
   protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
     exprs.zipWithIndex.map {
@@ -127,6 +128,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
       | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
       )
     | insert
+    | cte
     )
 
   protected lazy val select: Parser[LogicalPlan] =
@@ -156,6 +158,11 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
       case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
     }
 
+  protected lazy val cte: Parser[LogicalPlan] =
+    WITH ~> rep1sep(ident ~ ( AS ~ "(" ~> start <~ ")"), ",") ~ start ^^ {
+      case r ~ s => With(s, r.map({case n ~ s => (n, Subquery(n, s))}).toMap)
+    }
+
   protected lazy val projection: Parser[Expression] =
     expression ~ (AS.? ~> ident.?) ^^ {
       case e ~ a => a.fold(e)(Alias(e, _)())

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 524c73c..b83f18a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -169,21 +169,36 @@ class Analyzer(
    * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
    */
   object ResolveRelations extends Rule[LogicalPlan] {
-    def getTable(u: UnresolvedRelation): LogicalPlan = {
+    def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]) = {
       try {
-        catalog.lookupRelation(u.tableIdentifier, u.alias)
+        // In hive, if there is same table name in database and CTE definition,
+        // hive will use the table in database, not the CTE one.
+        // Taking into account the reasonableness and the implementation complexity,
+        // here use the CTE definition first, check table name only and ignore database name
+        cteRelations.get(u.tableIdentifier.last)
+          .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
+          .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
       } catch {
         case _: NoSuchTableException =>
           u.failAnalysis(s"no such table ${u.tableName}")
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
-        i.copy(
-          table = EliminateSubQueries(getTable(u)))
-      case u: UnresolvedRelation =>
-        getTable(u)
+    def apply(plan: LogicalPlan): LogicalPlan = {
+      val (realPlan, cteRelations) = plan match {
+        // TODO allow subquery to define CTE
+        // Add cte table to a temp relation map,drop `with` plan and keep its child
+        case With(child, relations) => (child, relations)
+        case other => (other, Map.empty[String, LogicalPlan])
+      }
+
+      realPlan transform {
+        case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+          i.copy(
+            table = EliminateSubQueries(getTable(u, cteRelations)))
+        case u: UnresolvedRelation =>
+          getTable(u, cteRelations)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 8633e06..3bd5aa5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -147,6 +147,18 @@ case class CreateTableAsSelect[T](
   override lazy val resolved: Boolean = databaseName != None && childrenResolved
 }
 
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ *                     key is the alias of the CTE definition,
+ *                     value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, Subquery]) extends UnaryNode {
+  override def output = child.output
+}
+
 case class WriteToFile(
     path: String,
     child: LogicalPlan) extends UnaryNode {

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1392b48..fb8fc6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -407,6 +407,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       mapData.collect().take(1).map(Row.fromTuple).toSeq)
   }
 
+  test("CTE feature") {
+    checkAnswer(
+      sql("with q1 as (select * from testData limit 10) select * from q1"),
+      testData.take(10).toSeq)
+
+    checkAnswer(
+      sql("""
+        |with q1 as (select * from testData where key= '5'),
+        |q2 as (select * from testData where key = '4')
+        |select * from q1 union all select * from q2""".stripMargin),
+      Row(5, "5") :: Row(4, "4") :: Nil)
+
+  }
+
   test("date row") {
     checkAnswer(sql(
       """select cast("2015-01-28" as date) from testData limit 1"""),

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 0bdaf5f..2fb2e7c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -576,11 +576,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
     case Token("TOK_QUERY", queryArgs)
         if Seq("TOK_FROM", "TOK_INSERT").contains(queryArgs.head.getText) =>
 
-      val (fromClause: Option[ASTNode], insertClauses) = queryArgs match {
-        case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
-          (Some(args.head), insertClauses)
-        case Token("TOK_INSERT", _) :: Nil => (None, queryArgs)
-      }
+      val (fromClause: Option[ASTNode], insertClauses, cteRelations) =
+        queryArgs match {
+          case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
+            // check if has CTE
+            insertClauses.last match {
+              case Token("TOK_CTE", cteClauses) =>
+                val cteRelations = cteClauses.map(node => {
+                  val relation = nodeToRelation(node).asInstanceOf[Subquery]
+                  (relation.alias, relation)
+                }).toMap
+                (Some(args.head), insertClauses.init, Some(cteRelations))
+
+              case _ => (Some(args.head), insertClauses, None)
+            }
+
+          case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None)
+        }
 
       // Return one query for each insert clause.
       val queries = insertClauses.map { case Token("TOK_INSERT", singleInsert) =>
@@ -794,7 +806,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       }
 
       // If there are multiple INSERTS just UNION them together into on query.
-      queries.reduceLeft(Union)
+      val query = queries.reduceLeft(Union)
+
+      // return With plan if there is CTE
+      cteRelations.map(With(query, _)).getOrElse(query)
 
     case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768 b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768
new file mode 100644
index 0000000..f6ba75d
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #1-0-eedabbfe6ba8799f7b7782fb47a82768	
@@ -0,0 +1,3 @@
+5
+5
+5

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9 b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9
new file mode 100644
index 0000000..ca7b591
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #2-0-aa03d104251f97e36bc52279cb9931c9	
@@ -0,0 +1,4 @@
+val_4
+val_5
+val_5
+val_5

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2 b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2
new file mode 100644
index 0000000..b8626c4
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/CTE feature #3-0-b5d4bf3c0ee92b2fda0ca24f422383f2	
@@ -0,0 +1 @@
+4

http://git-wip-us.apache.org/repos/asf/spark/blob/2f535887/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index af781a5..1222fba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -542,6 +542,21 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
   createQueryTest("select null from table",
     "SELECT null FROM src LIMIT 1")
 
+  createQueryTest("CTE feature #1",
+    "with q1 as (select key from src) select * from q1 where key = 5")
+
+  createQueryTest("CTE feature #2",
+    """with q1 as (select * from src where key= 5),
+      |q2 as (select * from src s2 where key = 4)
+      |select value from q1 union all select value from q2
+    """.stripMargin)
+
+  createQueryTest("CTE feature #3",
+    """with q1 as (select key from src)
+      |from q1
+      |select * where key = 4
+    """.stripMargin)
+
   test("predicates contains an empty AttributeSet() references") {
     sql(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org