You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/21 19:45:25 UTC
spark git commit: [SPARK-11179] [SQL] Push filters through aggregate
Repository: spark
Updated Branches:
refs/heads/master 8e82e5983 -> f62e32608
[SPARK-11179] [SQL] Push filters through aggregate
Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions.
Query plan before optimisation :-
Filter ((c#138L = 2) && (a#0 = 3))
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Query plan after optimisation :-
Filter (c#138L = 2)
Aggregate [a#0], [a#0,count(b#1) AS c#138L]
Filter (a#0 = 3)
Project [a#0,b#1]
LocalRelation [a#0,b#1,c#2]
Author: nitin goyal <ni...@guavus.com>
Author: nitin.goyal <ni...@guavus.com>
Closes #9167 from nitin2goyal/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f62e3260
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f62e3260
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f62e3260
Branch: refs/heads/master
Commit: f62e3260889d67256d335fd0dd38f114ae4e3eca
Parents: 8e82e59
Author: nitin goyal <ni...@guavus.com>
Authored: Wed Oct 21 10:45:21 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Oct 21 10:45:21 2015 -0700
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 24 +++++++++++
.../optimizer/FilterPushdownSuite.scala | 45 ++++++++++++++++++++
2 files changed, 69 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f62e3260/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6557c70..0139b9e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -46,6 +46,7 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
+ PushPredicateThroughAggregate,
ColumnPruning,
// Operator combine
ProjectCollapsing,
@@ -675,6 +676,29 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp
}
/**
+ * Push [[Filter]] operators through [[Aggregate]] operators. Parts of the predicate that reference
+ * attributes which are subset of group by attribute set of [[Aggregate]] will be pushed beneath,
+ * and the rest should remain above.
+ */
+object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHelper {
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case filter @ Filter(condition,
+ aggregate @ Aggregate(groupingExpressions, aggregateExpressions, grandChild)) =>
+ val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition {
+ conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions)
+ }
+ if (pushDown.nonEmpty) {
+ val pushDownPredicate = pushDown.reduce(And)
+ val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild))
+ stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown)
+ } else {
+ filter
+ }
+ }
+}
+
+/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
* [[Filter]] conditions are moved into the `condition` of the [[Join]].
http://git-wip-us.apache.org/repos/asf/spark/blob/f62e3260/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0f1fde2..ed810a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -40,6 +40,7 @@ class FilterPushdownSuite extends PlanTest {
BooleanSimplification,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
+ PushPredicateThroughAggregate,
ColumnPruning,
ProjectCollapsing) :: Nil
}
@@ -652,4 +653,48 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze)
}
+
+ test("aggregate: push down filter when filter on group by expression") {
+ val originalQuery = testRelation
+ .groupBy('a)('a, Count('b) as 'c)
+ .select('a, 'c)
+ .where('a === 2)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val correctAnswer = testRelation
+ .where('a === 2)
+ .groupBy('a)('a, Count('b) as 'c)
+ .analyze
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("aggregate: don't push down filter when filter not on group by expression") {
+ val originalQuery = testRelation
+ .select('a, 'b)
+ .groupBy('a)('a, Count('b) as 'c)
+ .where('c === 2L)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ comparePlans(optimized, originalQuery.analyze)
+ }
+
+ test("aggregate: push down filters partially which are subset of group by expressions") {
+ val originalQuery = testRelation
+ .select('a, 'b)
+ .groupBy('a)('a, Count('b) as 'c)
+ .where('c === 2L && 'a === 3)
+
+ val optimized = Optimize.execute(originalQuery.analyze)
+
+ val correctAnswer = testRelation
+ .select('a, 'b)
+ .where('a === 3)
+ .groupBy('a)('a, Count('b) as 'c)
+ .where('c === 2L)
+ .analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org