You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/21 19:45:25 UTC

spark git commit: [SPARK-11179] [SQL] Push filters through aggregate

Repository: spark
Updated Branches:
  refs/heads/master 8e82e5983 -> f62e32608


[SPARK-11179] [SQL] Push filters through aggregate

Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions.

Query plan before optimisation :-
Filter ((c#138L = 2) && (a#0 = 3))
 Aggregate [a#0], [a#0,count(b#1) AS c#138L]
  Project [a#0,b#1]
   LocalRelation [a#0,b#1,c#2]

Query plan after optimisation :-
Filter (c#138L = 2)
 Aggregate [a#0], [a#0,count(b#1) AS c#138L]
  Filter (a#0 = 3)
   Project [a#0,b#1]
    LocalRelation [a#0,b#1,c#2]

Author: nitin goyal <ni...@guavus.com>
Author: nitin.goyal <ni...@guavus.com>

Closes #9167 from nitin2goyal/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f62e3260
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f62e3260
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f62e3260

Branch: refs/heads/master
Commit: f62e3260889d67256d335fd0dd38f114ae4e3eca
Parents: 8e82e59
Author: nitin goyal <ni...@guavus.com>
Authored: Wed Oct 21 10:45:21 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Oct 21 10:45:21 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 24 +++++++++++
 .../optimizer/FilterPushdownSuite.scala         | 45 ++++++++++++++++++++
 2 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f62e3260/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6557c70..0139b9e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -46,6 +46,7 @@ object DefaultOptimizer extends Optimizer {
       PushPredicateThroughJoin,
       PushPredicateThroughProject,
       PushPredicateThroughGenerate,
+      PushPredicateThroughAggregate,
       ColumnPruning,
       // Operator combine
       ProjectCollapsing,
@@ -675,6 +676,29 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp
 }
 
 /**
+ * Push [[Filter]] operators through [[Aggregate]] operators. Parts of the predicate that reference
+ * attributes which are subset of group by attribute set of [[Aggregate]] will be pushed beneath,
+ * and the rest should remain above.
+ */
+object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case filter @ Filter(condition,
+        aggregate @ Aggregate(groupingExpressions, aggregateExpressions, grandChild)) =>
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition {
+        conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions)
+      }
+      if (pushDown.nonEmpty) {
+        val pushDownPredicate = pushDown.reduce(And)
+        val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild))
+        stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown)
+      } else {
+        filter
+      }
+  }
+}
+
+/**
  * Pushes down [[Filter]] operators where the `condition` can be
  * evaluated using only the attributes of the left or right side of a join.  Other
  * [[Filter]] conditions are moved into the `condition` of the [[Join]].

http://git-wip-us.apache.org/repos/asf/spark/blob/f62e3260/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0f1fde2..ed810a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -40,6 +40,7 @@ class FilterPushdownSuite extends PlanTest {
         BooleanSimplification,
         PushPredicateThroughJoin,
         PushPredicateThroughGenerate,
+        PushPredicateThroughAggregate,
         ColumnPruning,
         ProjectCollapsing) :: Nil
   }
@@ -652,4 +653,48 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer.analyze)
   }
+
+  test("aggregate: push down filter when filter on group by expression") {
+    val originalQuery = testRelation
+                        .groupBy('a)('a, Count('b) as 'c)
+                        .select('a, 'c)
+                        .where('a === 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+                        .where('a === 2)
+                        .groupBy('a)('a, Count('b) as 'c)
+                        .analyze
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("aggregate: don't push down filter when filter not on group by expression") {
+    val originalQuery = testRelation
+                        .select('a, 'b)
+                        .groupBy('a)('a, Count('b) as 'c)
+                        .where('c === 2L)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    comparePlans(optimized, originalQuery.analyze)
+  }
+
+  test("aggregate: push down filters partially which are subset of group by expressions") {
+    val originalQuery = testRelation
+                        .select('a, 'b)
+                        .groupBy('a)('a, Count('b) as 'c)
+                        .where('c === 2L && 'a === 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+                        .select('a, 'b)
+                        .where('a === 3)
+                        .groupBy('a)('a, Count('b) as 'c)
+                        .where('c === 2L)
+                        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org