You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/29 19:04:15 UTC
spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate
rule to pushdown predicates correctly in non-deterministic condition.
Repository: spark
Updated Branches:
refs/heads/branch-2.0 ca8130050 -> 7ffafa3bf
[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.
## What changes were proposed in this pull request?
Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example:
```SELECT a FROM t WHERE rand() < 0.1 AND a = 1```
And
```SELECT a FROM t WHERE a = 1 AND rand() < 0.1```
may call rand() for different times and therefore the output rows differ.
This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates.
## How was this patch tested?
Expanded related testcases in FilterPushdownSuite.
Author: \u848b\u661f\u535a <ji...@meituan.com>
Closes #14012 from jiangxb1987/ppd.
(cherry picked from commit f376c37268848dbb4b2fb57677e22ef2bf207b49)
Signed-off-by: Josh Rosen <jo...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ffafa3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ffafa3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ffafa3b
Branch: refs/heads/branch-2.0
Commit: 7ffafa3bfecb8bc92b79eddea1ca18166efd3385
Parents: ca81300
Author: \u848b\u661f\u535a <ji...@meituan.com>
Authored: Thu Jul 14 00:21:27 2016 +0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Sep 29 11:44:00 2016 -0700
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 44 +++++++++++++-------
.../optimizer/FilterPushdownSuite.scala | 8 ++--
2 files changed, 33 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d824c2e..35b122d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1031,19 +1031,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
- // pushed beneath must satisfy the following two conditions:
+ // pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
- // 2. Deterministic
+ // 2. Deterministic.
+ // 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
- cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
- // This is for ensuring all the partitioning expressions have been converted to alias
- // in Analyzer. Thus, we do not need to check if the expressions in conditions are
- // the same as the expressions used in partitioning columns.
- partitionAttrs.forall(_.isInstanceOf[Attribute])
+
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(condition).span(_.deterministic)
+
+ val (pushDown, rest) = candidates.partition { cond =>
+ cond.references.subsetOf(partitionAttrs)
}
+
+ val stayUp = rest ++ containingNonDeterministic
+
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
@@ -1062,11 +1066,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(condition).span(_.deterministic)
+
+ val (pushDown, rest) = candidates.partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
- replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic
+ replaced.references.subsetOf(aggregate.child.outputSet)
}
+ val stayUp = rest ++ containingNonDeterministic
+
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
@@ -1080,9 +1089,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
- val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
- cond.deterministic
- }
+ val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
+
if (pushDown.nonEmpty) {
val pushDownCond = pushDown.reduceLeft(And)
val output = union.output
@@ -1133,9 +1141,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// come from grandchild.
// TODO: non-deterministic predicates could be pushed through some operators that do not change
// the rows.
- val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond =>
- cond.deterministic && cond.references.subsetOf(grandchild.outputSet)
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(filter.condition).span(_.deterministic)
+
+ val (pushDown, rest) = candidates.partition { cond =>
+ cond.references.subsetOf(grandchild.outputSet)
}
+
+ val stayUp = rest ++ containingNonDeterministic
+
if (pushDown.nonEmpty) {
val newChild = insertFilter(pushDown.reduceLeft(And))
if (stayUp.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index ea868d1..55836f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -536,14 +536,14 @@ class FilterPushdownSuite extends PlanTest {
val originalQuery = {
testRelationWithArrayType
.generate(Explode('c_arr), true, false, Some("arr"))
- .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6))
+ .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6))
}
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = {
testRelationWithArrayType
.where('b >= 5)
.generate(Explode('c_arr), true, false, Some("arr"))
- .where('a + Rand(10).as("rnd") > 6)
+ .where('a + Rand(10).as("rnd") > 6 && 'c > 6)
.analyze
}
@@ -704,14 +704,14 @@ class FilterPushdownSuite extends PlanTest {
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
val originalQuery = Union(Seq(testRelation, testRelation2))
- .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+ .where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Union(Seq(
testRelation.where('a === 2L),
testRelation2.where('d === 2L)))
- .where('b + Rand(10).as("rnd") === 3)
+ .where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
.analyze
comparePlans(optimized, correctAnswer)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org