You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/04 05:15:03 UTC

spark git commit: [SPARK-22944][SQL] improve FoldablePropagation

Repository: spark
Updated Branches:
  refs/heads/master b29702913 -> 7d045c5f0


[SPARK-22944][SQL] improve FoldablePropagation

## What changes were proposed in this pull request?

`FoldablePropagation` is a little tricky as it needs to handle attributes that are miss-derived from children, e.g. outer join outputs. This rule does a kind of stop-able tree transform, to skip to apply this rule when hit a node which may have miss-derived attributes.

Logically we should be able to apply this rule above the unsupported nodes, by just treating the unsupported nodes as leaf nodes. This PR improves this rule to not stop the tree transformation, but reduce the foldable expressions that we want to propagate.

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #20139 from cloud-fan/foldable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d045c5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d045c5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d045c5f

Branch: refs/heads/master
Commit: 7d045c5f00e2c7c67011830e2169a4e130c3ace8
Parents: b297029
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Jan 4 13:14:52 2018 +0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Jan 4 13:14:52 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/expressions.scala    | 65 +++++++++++---------
 .../optimizer/FoldablePropagationSuite.scala    | 23 ++++++-
 2 files changed, 58 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d045c5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 7d830bb..1c0b7bd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -506,18 +506,21 @@ object NullPropagation extends Rule[LogicalPlan] {
 
 
 /**
- * Propagate foldable expressions:
  * Replace attributes with aliases of the original foldable expressions if possible.
- * Other optimizations will take advantage of the propagated foldable expressions.
- *
+ * Other optimizations will take advantage of the propagated foldable expressions. For example,
+ * this rule can optimize
  * {{{
  *   SELECT 1.0 x, 'abc' y, Now() z ORDER BY x, y, 3
- *   ==>  SELECT 1.0 x, 'abc' y, Now() z ORDER BY 1.0, 'abc', Now()
  * }}}
+ * to
+ * {{{
+ *   SELECT 1.0 x, 'abc' y, Now() z ORDER BY 1.0, 'abc', Now()
+ * }}}
+ * and other rules can further optimize it and remove the ORDER BY operator.
  */
 object FoldablePropagation extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
-    val foldableMap = AttributeMap(plan.flatMap {
+    var foldableMap = AttributeMap(plan.flatMap {
       case Project(projectList, _) => projectList.collect {
         case a: Alias if a.child.foldable => (a.toAttribute, a)
       }
@@ -530,38 +533,44 @@ object FoldablePropagation extends Rule[LogicalPlan] {
     if (foldableMap.isEmpty) {
       plan
     } else {
-      var stop = false
       CleanupAliases(plan.transformUp {
-        // A leaf node should not stop the folding process (note that we are traversing up the
-        // tree, starting at the leaf nodes); so we are allowing it.
-        case l: LeafNode =>
-          l
-
         // We can only propagate foldables for a subset of unary nodes.
-        case u: UnaryNode if !stop && canPropagateFoldables(u) =>
+        case u: UnaryNode if foldableMap.nonEmpty && canPropagateFoldables(u) =>
           u.transformExpressions(replaceFoldable)
 
-        // Allow inner joins. We do not allow outer join, although its output attributes are
-        // derived from its children, they are actually different attributes: the output of outer
-        // join is not always picked from its children, but can also be null.
+        // Join derives the output attributes from its child while they are actually not the
+        // same attributes. For example, the output of outer join is not always picked from its
+        // children, but can also be null. We should exclude these miss-derived attributes when
+        // propagating the foldable expressions.
         // TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes
         // of outer join.
-        case j @ Join(_, _, Inner, _) if !stop =>
-          j.transformExpressions(replaceFoldable)
-
-        // We can fold the projections an expand holds. However expand changes the output columns
-        // and often reuses the underlying attributes; so we cannot assume that a column is still
-        // foldable after the expand has been applied.
-        // TODO(hvanhovell): Expand should use new attributes as the output attributes.
-        case expand: Expand if !stop =>
-          val newExpand = expand.copy(projections = expand.projections.map { projection =>
+        case j @ Join(left, right, joinType, _) if foldableMap.nonEmpty =>
+          val newJoin = j.transformExpressions(replaceFoldable)
+          val missDerivedAttrsSet: AttributeSet = AttributeSet(joinType match {
+            case _: InnerLike | LeftExistence(_) => Nil
+            case LeftOuter => right.output
+            case RightOuter => left.output
+            case FullOuter => left.output ++ right.output
+          })
+          foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot {
+            case (attr, _) => missDerivedAttrsSet.contains(attr)
+          }.toSeq)
+          newJoin
+
+        // We can not replace the attributes in `Expand.output`. If there are other non-leaf
+        // operators that have the `output` field, we should put them here too.
+        case expand: Expand if foldableMap.nonEmpty =>
+          expand.copy(projections = expand.projections.map { projection =>
             projection.map(_.transform(replaceFoldable))
           })
-          stop = true
-          newExpand
 
-        case other =>
-          stop = true
+        // For other plans, they are not safe to apply foldable propagation, and they should not
+        // propagate foldable expressions from children.
+        case other if foldableMap.nonEmpty =>
+          val childrenOutputSet = AttributeSet(other.children.flatMap(_.output))
+          foldableMap = AttributeMap(foldableMap.baseMap.values.filterNot {
+            case (attr, _) => childrenOutputSet.contains(attr)
+          }.toSeq)
           other
       })
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d045c5f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
index dccb32f..c288446 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
@@ -147,8 +147,8 @@ class FoldablePropagationSuite extends PlanTest {
   test("Propagate in expand") {
     val c1 = Literal(1).as('a)
     val c2 = Literal(2).as('b)
-    val a1 = c1.toAttribute.withNullability(true)
-    val a2 = c2.toAttribute.withNullability(true)
+    val a1 = c1.toAttribute.newInstance().withNullability(true)
+    val a2 = c2.toAttribute.newInstance().withNullability(true)
     val expand = Expand(
       Seq(Seq(Literal(null), 'b), Seq('a, Literal(null))),
       Seq(a1, a2),
@@ -161,4 +161,23 @@ class FoldablePropagationSuite extends PlanTest {
     val correctAnswer = correctExpand.where(a1.isNotNull).select(a1, a2).analyze
     comparePlans(optimized, correctAnswer)
   }
+
+  test("Propagate above outer join") {
+    val left = LocalRelation('a.int).select('a, Literal(1).as('b))
+    val right = LocalRelation('c.int).select('c, Literal(1).as('d))
+
+    val join = left.join(
+      right,
+      joinType = LeftOuter,
+      condition = Some('a === 'c && 'b === 'd))
+    val query = join.select(('b + 3).as('res)).analyze
+    val optimized = Optimize.execute(query)
+
+    val correctAnswer = left.join(
+      right,
+      joinType = LeftOuter,
+      condition = Some('a === 'c && Literal(1) === Literal(1)))
+      .select((Literal(1) + 3).as('res)).analyze
+    comparePlans(optimized, correctAnswer)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org