You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/02/07 21:29:08 UTC

spark git commit: [SPARK-18609][SPARK-18841][SQL] Fix redundant Alias removal in the optimizer

Repository: spark
Updated Branches:
  refs/heads/master b7277e03d -> 73ee73945


[SPARK-18609][SPARK-18841][SQL] Fix redundant Alias removal in the optimizer

## What changes were proposed in this pull request?
The optimizer tries to remove redundant alias only projections from the query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies removes such a project and rewrites the project's attributes in the **entire** tree. This causes problems when parts of the tree are duplicated (for instance a self join on a temporary view/CTE)  and the duplicated part contains the alias only project, in this case the rewrite will break the tree.

This PR fixes these problems by using a blacklist for attributes that are not to be moved, and by making sure that attribute remapping is only done for the parent tree, and not for unrelated parts of the query plan.

The current tree transformation infrastructure works very well if the transformation at hand requires little or a global contextual information. In this case we need to know both the attributes that were not to be moved, and we also needed to know which child attributes were modified. This cannot be done easily using the current infrastructure, and solutions typically involves transversing the query plan multiple times (which is super slow). I have moved around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much more straightforward; this basically allows you to manually traverse the tree.

This PR subsumes the following PRs by windpiger:
Closes https://github.com/apache/spark/pull/16267
Closes https://github.com/apache/spark/pull/16255

## How was this patch tested?
I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have added integration tests to the `SQLQueryTestSuite.union` and `SQLQueryTestSuite.cte` test cases.

Author: Herman van Hovell <hv...@databricks.com>

Closes #16757 from hvanhovell/SPARK-18609.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73ee7394
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73ee7394
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73ee7394

Branch: refs/heads/master
Commit: 73ee73945e369a862480ef4ac64e55c797bd7d90
Parents: b7277e0
Author: Herman van Hovell <hv...@databricks.com>
Authored: Tue Feb 7 22:28:59 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Feb 7 22:28:59 2017 +0100

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 125 +++++++++++++------
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  42 ++-----
 .../catalyst/plans/logical/LogicalPlan.scala    |   2 +-
 .../spark/sql/catalyst/trees/TreeNode.scala     |  46 ++-----
 .../optimizer/RemoveAliasOnlyProjectSuite.scala |  77 ------------
 .../RemoveRedundantAliasAndProjectSuite.scala   | 119 ++++++++++++++++++
 .../src/test/resources/sql-tests/inputs/cte.sql |  15 +++
 .../test/resources/sql-tests/inputs/union.sql   |  16 +++
 .../resources/sql-tests/results/cte.sql.out     |  49 +++++++-
 .../resources/sql-tests/results/union.sql.out   |  70 ++++++++++-
 10 files changed, 374 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d8e9d30..0c13e3e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -110,7 +110,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       SimplifyCaseConversionExpressions,
       RewriteCorrelatedScalarSubquery,
       EliminateSerialization,
-      RemoveAliasOnlyProject,
+      RemoveRedundantAliases,
+      RemoveRedundantProject,
       SimplifyCreateStructOps,
       SimplifyCreateArrayOps,
       SimplifyCreateMapOps) ::
@@ -157,56 +158,98 @@ class SimpleTestOptimizer extends Optimizer(
   new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 /**
- * Removes the Project only conducting Alias of its child node.
- * It is created mainly for removing extra Project added in EliminateSerialization rule,
- * but can also benefit other operators.
+ * Remove redundant aliases from a query plan. A redundant alias is an alias that does not change
+ * the name or metadata of a column, and does not deduplicate it.
  */
-object RemoveAliasOnlyProject extends Rule[LogicalPlan] {
+object RemoveRedundantAliases extends Rule[LogicalPlan] {
+
   /**
-   * Returns true if the project list is semantically same as child output, after strip alias on
-   * attribute.
+   * Create an attribute mapping from the old to the new attributes. This function will only
+   * return the attribute pairs that have changed.
    */
-  private def isAliasOnly(
-      projectList: Seq[NamedExpression],
-      childOutput: Seq[Attribute]): Boolean = {
-    if (projectList.length != childOutput.length) {
-      false
-    } else {
-      stripAliasOnAttribute(projectList).zip(childOutput).forall {
-        case (a: Attribute, o) if a semanticEquals o => true
-        case _ => false
-      }
+  private def createAttributeMapping(current: LogicalPlan, next: LogicalPlan)
+      : Seq[(Attribute, Attribute)] = {
+    current.output.zip(next.output).filterNot {
+      case (a1, a2) => a1.semanticEquals(a2)
     }
   }
 
-  private def stripAliasOnAttribute(projectList: Seq[NamedExpression]) = {
-    projectList.map {
-      // Alias with metadata can not be stripped, or the metadata will be lost.
-      // If the alias name is different from attribute name, we can't strip it either, or we may
-      // accidentally change the output schema name of the root plan.
-      case a @ Alias(attr: Attribute, name) if a.metadata == Metadata.empty && name == attr.name =>
-        attr
-      case other => other
-    }
+  /**
+   * Remove the top-level alias from an expression when it is redundant.
+   */
+  private def removeRedundantAlias(e: Expression, blacklist: AttributeSet): Expression = e match {
+    // Alias with metadata can not be stripped, or the metadata will be lost.
+    // If the alias name is different from attribute name, we can't strip it either, or we
+    // may accidentally change the output schema name of the root plan.
+    case a @ Alias(attr: Attribute, name)
+      if a.metadata == Metadata.empty && name == attr.name && !blacklist.contains(attr) =>
+      attr
+    case a => a
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    val aliasOnlyProject = plan.collectFirst {
-      case p @ Project(pList, child) if isAliasOnly(pList, child.output) => p
-    }
+  /**
+   * Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to
+   * prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self)
+   * join.
+   */
+  private def removeRedundantAliases(plan: LogicalPlan, blacklist: AttributeSet): LogicalPlan = {
+    plan match {
+      // A join has to be treated differently, because the left and the right side of the join are
+      // not allowed to use the same attributes. We use a blacklist to prevent us from creating a
+      // situation in which this happens; the rule will only remove an alias if its child
+      // attribute is not on the black list.
+      case Join(left, right, joinType, condition) =>
+        val newLeft = removeRedundantAliases(left, blacklist ++ right.outputSet)
+        val newRight = removeRedundantAliases(right, blacklist ++ newLeft.outputSet)
+        val mapping = AttributeMap(
+          createAttributeMapping(left, newLeft) ++
+          createAttributeMapping(right, newRight))
+        val newCondition = condition.map(_.transform {
+          case a: Attribute => mapping.getOrElse(a, a)
+        })
+        Join(newLeft, newRight, joinType, newCondition)
+
+      case _ =>
+        // Remove redundant aliases in the subtree(s).
+        val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
+        val newNode = plan.mapChildren { child =>
+          val newChild = removeRedundantAliases(child, blacklist)
+          currentNextAttrPairs ++= createAttributeMapping(child, newChild)
+          newChild
+        }
 
-    aliasOnlyProject.map { case proj =>
-      val attributesToReplace = proj.output.zip(proj.child.output).filterNot {
-        case (a1, a2) => a1 semanticEquals a2
-      }
-      val attrMap = AttributeMap(attributesToReplace)
-      plan transform {
-        case plan: Project if plan eq proj => plan.child
-        case plan => plan transformExpressions {
-          case a: Attribute if attrMap.contains(a) => attrMap(a)
+        // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate
+        // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this
+        // case we use the the first mapping (which should be provided by the first child).
+        val mapping = AttributeMap(currentNextAttrPairs)
+
+        // Create a an expression cleaning function for nodes that can actually produce redundant
+        // aliases, use identity otherwise.
+        val clean: Expression => Expression = plan match {
+          case _: Project => removeRedundantAlias(_, blacklist)
+          case _: Aggregate => removeRedundantAlias(_, blacklist)
+          case _: Window => removeRedundantAlias(_, blacklist)
+          case _ => identity[Expression]
         }
-      }
-    }.getOrElse(plan)
+
+        // Transform the expressions.
+        newNode.mapExpressions { expr =>
+          clean(expr.transform {
+            case a: Attribute => mapping.getOrElse(a, a)
+          })
+        }
+    }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, AttributeSet.empty)
+}
+
+/**
+ * Remove projections from the query plan that do not make any modifications.
+ */
+object RemoveRedundantProject extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case p @ Project(_, child) if p.output == child.output => child
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index b108017..a576170 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -242,31 +242,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
    * @param rule the rule to be applied to every expression in this operator.
    */
   def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type = {
-    var changed = false
-
-    @inline def transformExpressionDown(e: Expression): Expression = {
-      val newE = e.transformDown(rule)
-      if (newE.fastEquals(e)) {
-        e
-      } else {
-        changed = true
-        newE
-      }
-    }
-
-    def recursiveTransform(arg: Any): AnyRef = arg match {
-      case e: Expression => transformExpressionDown(e)
-      case Some(e: Expression) => Some(transformExpressionDown(e))
-      case m: Map[_, _] => m
-      case d: DataType => d // Avoid unpacking Structs
-      case seq: Traversable[_] => seq.map(recursiveTransform)
-      case other: AnyRef => other
-      case null => null
-    }
-
-    val newArgs = mapProductIterator(recursiveTransform)
-
-    if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
+    mapExpressions(_.transformDown(rule))
   }
 
   /**
@@ -276,10 +252,18 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
    * @return
    */
   def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type = {
+    mapExpressions(_.transformUp(rule))
+  }
+
+  /**
+   * Apply a map function to each expression present in this query operator, and return a new
+   * query operator based on the mapped expressions.
+   */
+  def mapExpressions(f: Expression => Expression): this.type = {
     var changed = false
 
-    @inline def transformExpressionUp(e: Expression): Expression = {
-      val newE = e.transformUp(rule)
+    @inline def transformExpression(e: Expression): Expression = {
+      val newE = f(e)
       if (newE.fastEquals(e)) {
         e
       } else {
@@ -289,8 +273,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
     }
 
     def recursiveTransform(arg: Any): AnyRef = arg match {
-      case e: Expression => transformExpressionUp(e)
-      case Some(e: Expression) => Some(transformExpressionUp(e))
+      case e: Expression => transformExpression(e)
+      case Some(e: Expression) => Some(transformExpression(e))
       case m: Map[_, _] => m
       case d: DataType => d // Avoid unpacking Structs
       case seq: Traversable[_] => seq.map(recursiveTransform)

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 93550e1..0937825 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -56,7 +56,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
    */
   def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
     if (!analyzed) {
-      val afterRuleOnChildren = transformChildren(rule, (t, r) => t.resolveOperators(r))
+      val afterRuleOnChildren = mapChildren(_.resolveOperators(rule))
       if (this fastEquals afterRuleOnChildren) {
         CurrentOrigin.withOrigin(origin) {
           rule.applyOrElse(this, identity[LogicalPlan])

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 8fec9dd..f37661c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -191,26 +191,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   }
 
   /**
-   * Returns a copy of this node where `f` has been applied to all the nodes children.
-   */
-  def mapChildren(f: BaseType => BaseType): BaseType = {
-    var changed = false
-    val newArgs = mapProductIterator {
-      case arg: TreeNode[_] if containsChild(arg) =>
-        val newChild = f(arg.asInstanceOf[BaseType])
-        if (newChild fastEquals arg) {
-          arg
-        } else {
-          changed = true
-          newChild
-        }
-      case nonChild: AnyRef => nonChild
-      case null => null
-    }
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  /**
    * Returns a copy of this node with the children replaced.
    * TODO: Validate somewhere (in debug mode?) that children are ordered correctly.
    */
@@ -289,9 +269,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
 
     // Check if unchanged and then possibly return old copy to avoid gc churn.
     if (this fastEquals afterRule) {
-      transformChildren(rule, (t, r) => t.transformDown(r))
+      mapChildren(_.transformDown(rule))
     } else {
-      afterRule.transformChildren(rule, (t, r) => t.transformDown(r))
+      afterRule.mapChildren(_.transformDown(rule))
     }
   }
 
@@ -303,7 +283,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
    * @param rule the function use to transform this nodes children
    */
   def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
-    val afterRuleOnChildren = transformChildren(rule, (t, r) => t.transformUp(r))
+    val afterRuleOnChildren = mapChildren(_.transformUp(rule))
     if (this fastEquals afterRuleOnChildren) {
       CurrentOrigin.withOrigin(origin) {
         rule.applyOrElse(this, identity[BaseType])
@@ -316,18 +296,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   }
 
   /**
-   * Returns a copy of this node where `rule` has been recursively applied to all the children of
-   * this node.  When `rule` does not apply to a given node it is left unchanged.
-   * @param rule the function used to transform this nodes children
+   * Returns a copy of this node where `f` has been applied to all the nodes children.
    */
-  protected def transformChildren(
-      rule: PartialFunction[BaseType, BaseType],
-      nextOperation: (BaseType, PartialFunction[BaseType, BaseType]) => BaseType): BaseType = {
+  def mapChildren(f: BaseType => BaseType): BaseType = {
     if (children.nonEmpty) {
       var changed = false
       val newArgs = mapProductIterator {
         case arg: TreeNode[_] if containsChild(arg) =>
-          val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+          val newChild = f(arg.asInstanceOf[BaseType])
           if (!(newChild fastEquals arg)) {
             changed = true
             newChild
@@ -335,7 +311,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
             arg
           }
         case Some(arg: TreeNode[_]) if containsChild(arg) =>
-          val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+          val newChild = f(arg.asInstanceOf[BaseType])
           if (!(newChild fastEquals arg)) {
             changed = true
             Some(newChild)
@@ -344,7 +320,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
           }
         case m: Map[_, _] => m.mapValues {
           case arg: TreeNode[_] if containsChild(arg) =>
-            val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+            val newChild = f(arg.asInstanceOf[BaseType])
             if (!(newChild fastEquals arg)) {
               changed = true
               newChild
@@ -356,7 +332,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
         case d: DataType => d // Avoid unpacking Structs
         case args: Traversable[_] => args.map {
           case arg: TreeNode[_] if containsChild(arg) =>
-            val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
+            val newChild = f(arg.asInstanceOf[BaseType])
             if (!(newChild fastEquals arg)) {
               changed = true
               newChild
@@ -364,8 +340,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
               arg
             }
           case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
-            val newChild1 = nextOperation(arg1.asInstanceOf[BaseType], rule)
-            val newChild2 = nextOperation(arg2.asInstanceOf[BaseType], rule)
+            val newChild1 = f(arg1.asInstanceOf[BaseType])
+            val newChild2 = f(arg2.asInstanceOf[BaseType])
             if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
               changed = true
               (newChild1, newChild2)

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala
deleted file mode 100644
index 7c26cb5..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveAliasOnlyProjectSuite.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.types.MetadataBuilder
-
-class RemoveAliasOnlyProjectSuite extends PlanTest with PredicateHelper {
-
-  object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches = Batch("RemoveAliasOnlyProject", FixedPoint(50), RemoveAliasOnlyProject) :: Nil
-  }
-
-  test("all expressions in project list are aliased child output") {
-    val relation = LocalRelation('a.int, 'b.int)
-    val query = relation.select('a as 'a, 'b as 'b).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, relation)
-  }
-
-  test("all expressions in project list are aliased child output but with different order") {
-    val relation = LocalRelation('a.int, 'b.int)
-    val query = relation.select('b as 'b, 'a as 'a).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, query)
-  }
-
-  test("some expressions in project list are aliased child output") {
-    val relation = LocalRelation('a.int, 'b.int)
-    val query = relation.select('a as 'a, 'b).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, relation)
-  }
-
-  test("some expressions in project list are aliased child output but with different order") {
-    val relation = LocalRelation('a.int, 'b.int)
-    val query = relation.select('b as 'b, 'a).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, query)
-  }
-
-  test("some expressions in project list are not Alias or Attribute") {
-    val relation = LocalRelation('a.int, 'b.int)
-    val query = relation.select('a as 'a, 'b + 1).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, query)
-  }
-
-  test("some expressions in project list are aliased child output but with metadata") {
-    val relation = LocalRelation('a.int, 'b.int)
-    val metadata = new MetadataBuilder().putString("x", "y").build()
-    val aliasWithMeta = Alias('a, "a")(explicitMetadata = Some(metadata))
-    val query = relation.select(aliasWithMeta, 'b).analyze
-    val optimized = Optimize.execute(query)
-    comparePlans(optimized, query)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
new file mode 100644
index 0000000..c01ea01
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.types.MetadataBuilder
+
+class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch(
+      "RemoveAliasOnlyProject",
+      FixedPoint(50),
+      PushProjectionThroughUnion,
+      RemoveRedundantAliases,
+      RemoveRedundantProject) :: Nil
+  }
+
+  test("all expressions in project list are aliased child output") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.select('a as 'a, 'b as 'b).analyze
+    val optimized = Optimize.execute(query)
+    comparePlans(optimized, relation)
+  }
+
+  test("all expressions in project list are aliased child output but with different order") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.select('b as 'b, 'a as 'a).analyze
+    val optimized = Optimize.execute(query)
+    val expected = relation.select('b, 'a).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("some expressions in project list are aliased child output") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.select('a as 'a, 'b).analyze
+    val optimized = Optimize.execute(query)
+    comparePlans(optimized, relation)
+  }
+
+  test("some expressions in project list are aliased child output but with different order") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.select('b as 'b, 'a).analyze
+    val optimized = Optimize.execute(query)
+    val expected = relation.select('b, 'a).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("some expressions in project list are not Alias or Attribute") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.select('a as 'a, 'b + 1).analyze
+    val optimized = Optimize.execute(query)
+    val expected = relation.select('a, 'b + 1).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("some expressions in project list are aliased child output but with metadata") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val metadata = new MetadataBuilder().putString("x", "y").build()
+    val aliasWithMeta = Alias('a, "a")(explicitMetadata = Some(metadata))
+    val query = relation.select(aliasWithMeta, 'b).analyze
+    val optimized = Optimize.execute(query)
+    comparePlans(optimized, query)
+  }
+
+  test("retain deduplicating alias in self-join") {
+    val relation = LocalRelation('a.int)
+    val fragment = relation.select('a as 'a)
+    val query = fragment.select('a as 'a).join(fragment.select('a as 'a)).analyze
+    val optimized = Optimize.execute(query)
+    val expected = relation.join(relation.select('a as 'a)).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("alias removal should not break after push project through union") {
+    val r1 = LocalRelation('a.int)
+    val r2 = LocalRelation('b.int)
+    val query = r1.select('a as 'a).union(r2.select('b as 'b)).select('a).analyze
+    val optimized = Optimize.execute(query)
+    val expected = r1.union(r2)
+    comparePlans(optimized, expected)
+  }
+
+  test("remove redundant alias from aggregate") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.groupBy('a as 'a)('a as 'a, sum('b)).analyze
+    val optimized = Optimize.execute(query)
+    val expected = relation.groupBy('a)('a, sum('b)).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("remove redundant alias from window") {
+    val relation = LocalRelation('a.int, 'b.int)
+    val query = relation.window(Seq('b as 'b), Seq('a as 'a), Seq()).analyze
+    val optimized = Optimize.execute(query)
+    val expected = relation.window(Seq('b), Seq('a), Seq()).analyze
+    comparePlans(optimized, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/core/src/test/resources/sql-tests/inputs/cte.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
index 3914db2..d34d89f 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
@@ -12,3 +12,18 @@ WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
 
 -- WITH clause should reference the previous CTE
 WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2;
+
+-- SPARK-18609 CTE with self-join
+WITH CTE1 AS (
+  SELECT b.id AS id
+  FROM   T2 a
+         CROSS JOIN (SELECT id AS id FROM T2) b
+)
+SELECT t1.id AS c1,
+       t2.id AS c2
+FROM   CTE1 t1
+       CROSS JOIN CTE1 t2;
+
+-- Clean up
+DROP VIEW IF EXISTS t;
+DROP VIEW IF EXISTS t2;

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/core/src/test/resources/sql-tests/inputs/union.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/union.sql b/sql/core/src/test/resources/sql-tests/inputs/union.sql
index 1f4780a..e57d69e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/union.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/union.sql
@@ -22,6 +22,22 @@ FROM (SELECT 0 a, 0 b
       SELECT SUM(1) a, CAST(0 AS BIGINT) b
       UNION ALL SELECT 0 a, 0 b) T;
 
+-- Regression test for SPARK-18841 Push project through union should not be broken by redundant alias removal.
+CREATE OR REPLACE TEMPORARY VIEW p1 AS VALUES 1 T(col);
+CREATE OR REPLACE TEMPORARY VIEW p2 AS VALUES 1 T(col);
+CREATE OR REPLACE TEMPORARY VIEW p3 AS VALUES 1 T(col);
+SELECT 1 AS x,
+       col
+FROM   (SELECT col AS col
+        FROM (SELECT p1.col AS col
+              FROM   p1 CROSS JOIN p2
+              UNION ALL
+              SELECT col
+              FROM p3) T1) T2;
+
 -- Clean-up
 DROP VIEW IF EXISTS t1;
 DROP VIEW IF EXISTS t2;
+DROP VIEW IF EXISTS p1;
+DROP VIEW IF EXISTS p2;
+DROP VIEW IF EXISTS p3;

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/core/src/test/resources/sql-tests/results/cte.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
index 9fbad8f..a446c2c 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 9
 
 
 -- !query 0
@@ -55,3 +55,50 @@ struct<id:int,2:int>
 0	2
 1	2
 1	2
+
+
+-- !query 6
+WITH CTE1 AS (
+  SELECT b.id AS id
+  FROM   T2 a
+         CROSS JOIN (SELECT id AS id FROM T2) b
+)
+SELECT t1.id AS c1,
+       t2.id AS c2
+FROM   CTE1 t1
+       CROSS JOIN CTE1 t2
+-- !query 6 schema
+struct<c1:int,c2:int>
+-- !query 6 output
+0	0
+0	0
+0	0
+0	0
+0	1
+0	1
+0	1
+0	1
+1	0
+1	0
+1	0
+1	0
+1	1
+1	1
+1	1
+1	1
+
+
+-- !query 7
+DROP VIEW IF EXISTS t
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+DROP VIEW IF EXISTS t2
+-- !query 8 schema
+struct<>
+-- !query 8 output
+

http://git-wip-us.apache.org/repos/asf/spark/blob/73ee7394/sql/core/src/test/resources/sql-tests/results/union.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/union.sql.out b/sql/core/src/test/resources/sql-tests/results/union.sql.out
index c57028c..d123b7f 100644
--- a/sql/core/src/test/resources/sql-tests/results/union.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/union.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 7
+-- Number of queries: 14
 
 
 -- !query 0
@@ -65,7 +65,7 @@ struct<a:bigint>
 
 
 -- !query 5
-DROP VIEW IF EXISTS t1
+CREATE OR REPLACE TEMPORARY VIEW p1 AS VALUES 1 T(col)
 -- !query 5 schema
 struct<>
 -- !query 5 output
@@ -73,8 +73,72 @@ struct<>
 
 
 -- !query 6
-DROP VIEW IF EXISTS t2
+CREATE OR REPLACE TEMPORARY VIEW p2 AS VALUES 1 T(col)
 -- !query 6 schema
 struct<>
 -- !query 6 output
 
+
+
+-- !query 7
+CREATE OR REPLACE TEMPORARY VIEW p3 AS VALUES 1 T(col)
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+SELECT 1 AS x,
+       col
+FROM   (SELECT col AS col
+        FROM (SELECT p1.col AS col
+              FROM   p1 CROSS JOIN p2
+              UNION ALL
+              SELECT col
+              FROM p3) T1) T2
+-- !query 8 schema
+struct<x:int,col:int>
+-- !query 8 output
+1	1
+1	1
+
+
+-- !query 9
+DROP VIEW IF EXISTS t1
+-- !query 9 schema
+struct<>
+-- !query 9 output
+
+
+
+-- !query 10
+DROP VIEW IF EXISTS t2
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+DROP VIEW IF EXISTS p1
+-- !query 11 schema
+struct<>
+-- !query 11 output
+
+
+
+-- !query 12
+DROP VIEW IF EXISTS p2
+-- !query 12 schema
+struct<>
+-- !query 12 output
+
+
+
+-- !query 13
+DROP VIEW IF EXISTS p3
+-- !query 13 schema
+struct<>
+-- !query 13 output
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org