You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/13 22:01:19 UTC

spark git commit: [SPARK-14581] [SQL] push predicatese through more logical plans

Repository: spark
Updated Branches:
  refs/heads/master f9d578eaa -> dbbe14907


[SPARK-14581] [SQL] push predicatese through more logical plans

## What changes were proposed in this pull request?

Right now, filter push down only works with Project, Aggregate, Generate and Join, they can't be pushed through many other plans.

This PR added support for Union, Intersect, Except and all unary plans.

## How was this patch tested?

Added tests.

Author: Davies Liu <da...@databricks.com>

Closes #12342 from davies/filter_hint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbbe1490
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbbe1490
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbbe1490

Branch: refs/heads/master
Commit: dbbe149070052af5cda04f7b110d65de73766ded
Parents: f9d578e
Author: Davies Liu <da...@databricks.com>
Authored: Wed Apr 13 13:01:13 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Apr 13 13:01:13 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 111 ++++++++++++-------
 .../spark/sql/catalyst/planning/patterns.scala  |   3 +
 .../catalyst/optimizer/ColumnPruningSuite.scala |   2 +-
 .../optimizer/FilterPushdownSuite.scala         |  76 +++++++++++--
 .../optimizer/JoinOptimizationSuite.scala       |   4 +-
 .../catalyst/optimizer/PruneFiltersSuite.scala  |   2 +-
 6 files changed, 146 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbbe1490/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index bad115d..438cbab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -66,9 +66,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
       ReorderJoin,
       OuterJoinElimination,
       PushPredicateThroughJoin,
-      PushPredicateThroughProject,
-      PushPredicateThroughGenerate,
-      PushPredicateThroughAggregate,
+      PushDownPredicate,
       LimitPushDown,
       ColumnPruning,
       InferFiltersFromConstraints,
@@ -917,12 +915,13 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
 }
 
 /**
- * Pushes [[Filter]] operators through [[Project]] operators, in-lining any [[Alias Aliases]]
- * that were defined in the projection.
+ * Pushes [[Filter]] operators through many operators iff:
+ * 1) the operator is deterministic
+ * 2) the predicate is deterministic and the operator will not change any of rows.
  *
  * This heuristic is valid assuming the expression evaluation cost is minimal.
  */
-object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
+object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // SPARK-13473: We can't push the predicate down when the underlying projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially stateful. This
@@ -939,41 +938,7 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
       })
 
       project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
-  }
-
-}
-
-/**
- * Push [[Filter]] operators through [[Generate]] operators. Parts of the predicate that reference
- * attributes generated in [[Generate]] will remain above, and the rest should be pushed beneath.
- */
-object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelper {
-
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case filter @ Filter(condition, g: Generate) =>
-      // Predicates that reference attributes produced by the `Generate` operator cannot
-      // be pushed below the operator.
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
-        cond.references.subsetOf(g.child.outputSet) && cond.deterministic
-      }
-      if (pushDown.nonEmpty) {
-        val pushDownPredicate = pushDown.reduce(And)
-        val newGenerate = Generate(g.generator, join = g.join, outer = g.outer,
-          g.qualifier, g.generatorOutput, Filter(pushDownPredicate, g.child))
-        if (stayUp.isEmpty) newGenerate else Filter(stayUp.reduce(And), newGenerate)
-      } else {
-        filter
-      }
-  }
-}
 
-/**
- * Push [[Filter]] operators through [[Aggregate]] operators, iff the filters reference only
- * non-aggregate attributes (typically literals or grouping expressions).
- */
-object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHelper {
-
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case filter @ Filter(condition, aggregate: Aggregate) =>
       // Find all the aliased expressions in the aggregate list that don't include any actual
       // AggregateExpression, and create a map from the alias to the expression
@@ -999,6 +964,72 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel
       } else {
         filter
       }
+
+    case filter @ Filter(condition, child)
+      if child.isInstanceOf[Union] || child.isInstanceOf[Intersect] =>
+      // Union/Intersect could change the rows, so non-deterministic predicate can't be pushed down
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
+        cond.deterministic
+      }
+      if (pushDown.nonEmpty) {
+        val pushDownCond = pushDown.reduceLeft(And)
+        val output = child.output
+        val newGrandChildren = child.children.map { grandchild =>
+          val newCond = pushDownCond transform {
+            case e if output.exists(_.semanticEquals(e)) =>
+              grandchild.output(output.indexWhere(_.semanticEquals(e)))
+          }
+          assert(newCond.references.subsetOf(grandchild.outputSet))
+          Filter(newCond, grandchild)
+        }
+        val newChild = child.withNewChildren(newGrandChildren)
+        if (stayUp.nonEmpty) {
+          Filter(stayUp.reduceLeft(And), newChild)
+        } else {
+          newChild
+        }
+      } else {
+        filter
+      }
+
+    case filter @ Filter(condition, e @ Except(left, _)) =>
+      pushDownPredicate(filter, e.left) { predicate =>
+        e.copy(left = Filter(predicate, left))
+      }
+
+    // two filters should be combine together by other rules
+    case filter @ Filter(_, f: Filter) => filter
+    // should not push predicates through sample, or will generate different results.
+    case filter @ Filter(_, s: Sample) => filter
+    // TODO: push predicates through expand
+    case filter @ Filter(_, e: Expand) => filter
+
+    case filter @ Filter(condition, u: UnaryNode) if u.expressions.forall(_.deterministic) =>
+      pushDownPredicate(filter, u.child) { predicate =>
+        u.withNewChildren(Seq(Filter(predicate, u.child)))
+      }
+  }
+
+  private def pushDownPredicate(
+      filter: Filter,
+      grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {
+    // Only push down the predicates that is deterministic and all the referenced attributes
+    // come from grandchild.
+    // TODO: non-deterministic predicates could be pushed through some operators that do not change
+    // the rows.
+    val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond =>
+      cond.deterministic && cond.references.subsetOf(grandchild.outputSet)
+    }
+    if (pushDown.nonEmpty) {
+      val newChild = insertFilter(pushDown.reduceLeft(And))
+      if (stayUp.nonEmpty) {
+        Filter(stayUp.reduceLeft(And), newChild)
+      } else {
+        newChild
+      }
+    } else {
+      filter
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dbbe1490/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 6f35d87..0065619 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -69,6 +69,9 @@ object PhysicalOperation extends PredicateHelper {
         val substitutedCondition = substitute(aliases)(condition)
         (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
 
+      case BroadcastHint(child) =>
+        collectProjectsAndFilters(child)
+
       case other =>
         (None, Nil, other, Map.empty)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbbe1490/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 2248e03..52b574c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -34,7 +34,7 @@ class ColumnPruningSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches = Batch("Column pruning", FixedPoint(100),
-      PushPredicateThroughProject,
+      PushDownPredicate,
       ColumnPruning,
       CollapseProject) :: Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbbe1490/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index b84ae7c..df7529d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -33,14 +33,12 @@ class FilterPushdownSuite extends PlanTest {
     val batches =
       Batch("Subqueries", Once,
         EliminateSubqueryAliases) ::
-      Batch("Filter Pushdown", Once,
+      Batch("Filter Pushdown", FixedPoint(10),
         SamplePushDown,
         CombineFilters,
-        PushPredicateThroughProject,
+        PushDownPredicate,
         BooleanSimplification,
         PushPredicateThroughJoin,
-        PushPredicateThroughGenerate,
-        PushPredicateThroughAggregate,
         CollapseProject) :: Nil
   }
 
@@ -620,8 +618,8 @@ class FilterPushdownSuite extends PlanTest {
     val optimized = Optimize.execute(originalQuery.analyze)
 
     val correctAnswer = testRelation
-                        .select('a, 'b)
                         .where('a === 3)
+                        .select('a, 'b)
                         .groupBy('a)('a, count('b) as 'c)
                         .where('c === 2L)
                         .analyze
@@ -638,8 +636,8 @@ class FilterPushdownSuite extends PlanTest {
     val optimized = Optimize.execute(originalQuery.analyze)
 
     val correctAnswer = testRelation
-      .select('a, 'b)
       .where('a + 1 < 3)
+      .select('a, 'b)
       .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
       .where('c === 2L || 'aa > 4)
       .analyze
@@ -656,8 +654,8 @@ class FilterPushdownSuite extends PlanTest {
     val optimized = Optimize.execute(originalQuery.analyze)
 
     val correctAnswer = testRelation
-      .select('a, 'b)
       .where("s" === "s")
+      .select('a, 'b)
       .groupBy('a)('a, count('b) as 'c, "s" as 'd)
       .where('c === 2L)
       .analyze
@@ -681,4 +679,68 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("broadcast hint") {
+    val originalQuery = BroadcastHint(testRelation)
+      .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = BroadcastHint(testRelation.where('a === 2L))
+      .where('b + Rand(10).as("rnd") === 3)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("union") {
+    val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
+
+    val originalQuery = Union(Seq(testRelation, testRelation2))
+      .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = Union(Seq(
+      testRelation.where('a === 2L),
+      testRelation2.where('d === 2L)))
+      .where('b + Rand(10).as("rnd") === 3)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("intersect") {
+    val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
+
+    val originalQuery = Intersect(testRelation, testRelation2)
+      .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = Intersect(
+      testRelation.where('a === 2L),
+      testRelation2.where('d === 2L))
+      .where('b + Rand(10).as("rnd") === 3)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("except") {
+    val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
+
+    val originalQuery = Except(testRelation, testRelation2)
+      .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = Except(
+      testRelation.where('a === 2L),
+      testRelation2)
+      .where('b + Rand(10).as("rnd") === 3)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dbbe1490/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index e2f8146..c1ebf8b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -36,12 +36,10 @@ class JoinOptimizationSuite extends PlanTest {
         EliminateSubqueryAliases) ::
       Batch("Filter Pushdown", FixedPoint(100),
         CombineFilters,
-        PushPredicateThroughProject,
+        PushDownPredicate,
         BooleanSimplification,
         ReorderJoin,
         PushPredicateThroughJoin,
-        PushPredicateThroughGenerate,
-        PushPredicateThroughAggregate,
         ColumnPruning,
         CollapseProject) :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dbbe1490/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
index 14fb72a..d8cfec5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -34,7 +34,7 @@ class PruneFiltersSuite extends PlanTest {
       Batch("Filter Pushdown and Pruning", Once,
         CombineFilters,
         PruneFilters,
-        PushPredicateThroughProject,
+        PushDownPredicate,
         PushPredicateThroughJoin) :: Nil
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org