You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/08 07:55:46 UTC

spark git commit: [SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer

Repository: spark
Updated Branches:
  refs/heads/master 714db2ef5 -> f496bf3c5


[SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer

  Added a new batch named `Substitution` before `Resolution` batch. The motivation for this is there are kind of cases we want to do some substitution on the parsed logical plan before resolve it.
Consider this two cases:
1 CTE, for cte we first build a row logical plan
```
'With Map(q1 -> 'Subquery q1
                   'Project ['key]
                      'UnresolvedRelation [src], None)
 'Project [*]
  'Filter ('key = 5)
   'UnresolvedRelation [q1], None
```
In `With` logicalplan here is a map stored the (`q1-> subquery`), we want first take off the with command and substitute the  `q1` of `UnresolvedRelation` by the `subquery`

2 Another example is Window function, in window function user may define some windows, we also need substitute the window name of child by the concrete window. this should also done in the Substitution batch.

Author: wangfei <wa...@huawei.com>

Closes #5776 from scwf/addbatch and squashes the following commits:

d4b962f [wangfei] added WindowsSubstitution
70f6932 [wangfei] Merge branch 'master' of https://github.com/apache/spark into addbatch
ecaeafb [wangfei] address yhuai's comments
553005a [wangfei] fix test case
0c54798 [wangfei] address comments
29aaaaf [wangfei] fix compile
1c9a092 [wangfei] added Substitution bastch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f496bf3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f496bf3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f496bf3c

Branch: refs/heads/master
Commit: f496bf3c539a873ffdf3aa803847ef7b50135bd7
Parents: 714db2e
Author: wangfei <wa...@huawei.com>
Authored: Thu May 7 22:55:42 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu May 7 22:55:42 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 98 ++++++++++++--------
 1 file changed, 60 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f496bf3c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e46ad8..bb7913e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -55,6 +55,10 @@ class Analyzer(
   val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
 
   lazy val batches: Seq[Batch] = Seq(
+    Batch("Substitution", fixedPoint,
+      CTESubstitution ::
+      WindowsSubstitution ::
+      Nil : _*),
     Batch("Resolution", fixedPoint,
       ResolveRelations ::
       ResolveReferences ::
@@ -72,6 +76,55 @@ class Analyzer(
   )
 
   /**
+   * Substitute child plan with cte definitions
+   */
+  object CTESubstitution extends Rule[LogicalPlan] {
+    // TODO allow subquery to define CTE
+    def apply(plan: LogicalPlan): LogicalPlan = plan match {
+      case With(child, relations) => substituteCTE(child, relations)
+      case other => other
+    }
+
+    def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
+      plan transform {
+        // In hive, if there is same table name in database and CTE definition,
+        // hive will use the table in database, not the CTE one.
+        // Taking into account the reasonableness and the implementation complexity,
+        // here use the CTE definition first, check table name only and ignore database name
+        // see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
+        case u : UnresolvedRelation =>
+          val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
+            val withAlias = u.alias.map(Subquery(_, relation))
+            withAlias.getOrElse(relation)
+          }
+          substituted.getOrElse(u)
+      }
+    }
+  }
+
+  /**
+   * Substitute child plan with WindowSpecDefinitions.
+   */
+  object WindowsSubstitution extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      // Lookup WindowSpecDefinitions. This rule works with unresolved children.
+      case WithWindowDefinition(windowDefinitions, child) =>
+        child.transform {
+          case plan => plan.transformExpressions {
+            case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
+              val errorMessage =
+                s"Window specification $windowName is not defined in the WINDOW clause."
+              val windowSpecDefinition =
+                windowDefinitions
+                  .get(windowName)
+                  .getOrElse(failAnalysis(errorMessage))
+              WindowExpression(c, windowSpecDefinition)
+          }
+        }
+    }
+  }
+
+  /**
    * Removes no-op Alias expressions from the plan.
    */
   object TrimGroupingAliases extends Rule[LogicalPlan] {
@@ -172,36 +225,20 @@ class Analyzer(
    * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
    */
   object ResolveRelations extends Rule[LogicalPlan] {
-    def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
+    def getTable(u: UnresolvedRelation): LogicalPlan = {
       try {
-        // In hive, if there is same table name in database and CTE definition,
-        // hive will use the table in database, not the CTE one.
-        // Taking into account the reasonableness and the implementation complexity,
-        // here use the CTE definition first, check table name only and ignore database name
-        cteRelations.get(u.tableIdentifier.last)
-          .map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
-          .getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
+        catalog.lookupRelation(u.tableIdentifier, u.alias)
       } catch {
         case _: NoSuchTableException =>
           u.failAnalysis(s"no such table ${u.tableName}")
       }
     }
 
-    def apply(plan: LogicalPlan): LogicalPlan = {
-      val (realPlan, cteRelations) = plan match {
-        // TODO allow subquery to define CTE
-        // Add cte table to a temp relation map,drop `with` plan and keep its child
-        case With(child, relations) => (child, relations)
-        case other => (other, Map.empty[String, LogicalPlan])
-      }
-
-      realPlan transform {
-        case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
-          i.copy(
-            table = EliminateSubQueries(getTable(u, cteRelations)))
-        case u: UnresolvedRelation =>
-          getTable(u, cteRelations)
-      }
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
+        i.copy(table = EliminateSubQueries(getTable(u)))
+      case u: UnresolvedRelation =>
+        getTable(u)
     }
   }
 
@@ -664,21 +701,6 @@ class Analyzer(
     // We have to use transformDown at here to make sure the rule of
     // "Aggregate with Having clause" will be triggered.
     def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-      // Lookup WindowSpecDefinitions. This rule works with unresolved children.
-      case WithWindowDefinition(windowDefinitions, child) =>
-        child.transform {
-          case plan => plan.transformExpressions {
-            case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
-              val errorMessage =
-                s"Window specification $windowName is not defined in the WINDOW clause."
-              val windowSpecDefinition =
-                windowDefinitions
-                  .get(windowName)
-                  .getOrElse(failAnalysis(errorMessage))
-              WindowExpression(c, windowSpecDefinition)
-          }
-        }
-
       // Aggregate with Having clause. This rule works with an unresolved Aggregate because
       // a resolved Aggregate will not have Window Functions.
       case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org