You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/29 19:04:15 UTC

spark git commit: [SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ca8130050 -> 7ffafa3bf


[SPARK-16343][SQL] Improve the PushDownPredicate rule to pushdown predicates correctly in non-deterministic condition.

## What changes were proposed in this pull request?

Currently our Optimizer may reorder the predicates to run them more efficient, but in non-deterministic condition, change the order between deterministic parts and non-deterministic parts may change the number of input rows. For example:
```SELECT a FROM t WHERE rand() < 0.1 AND a = 1```
And
```SELECT a FROM t WHERE a = 1 AND rand() < 0.1```
may call rand() for different times and therefore the output rows differ.

This PR improved this condition by checking whether the predicate is placed before any non-deterministic predicates.

## How was this patch tested?

Expanded related testcases in FilterPushdownSuite.

Author: \u848b\u661f\u535a <ji...@meituan.com>

Closes #14012 from jiangxb1987/ppd.

(cherry picked from commit f376c37268848dbb4b2fb57677e22ef2bf207b49)
Signed-off-by: Josh Rosen <jo...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ffafa3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ffafa3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ffafa3b

Branch: refs/heads/branch-2.0
Commit: 7ffafa3bfecb8bc92b79eddea1ca18166efd3385
Parents: ca81300
Author: \u848b\u661f\u535a <ji...@meituan.com>
Authored: Thu Jul 14 00:21:27 2016 +0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Sep 29 11:44:00 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 44 +++++++++++++-------
 .../optimizer/FilterPushdownSuite.scala         |  8 ++--
 2 files changed, 33 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d824c2e..35b122d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1031,19 +1031,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
       project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
 
     // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
-    // pushed beneath must satisfy the following two conditions:
+    // pushed beneath must satisfy the following conditions:
     // 1. All the expressions are part of window partitioning key. The expressions can be compound.
-    // 2. Deterministic
+    // 2. Deterministic.
+    // 3. Placed before any non-deterministic predicates.
     case filter @ Filter(condition, w: Window)
         if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
       val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
-        cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
-          // This is for ensuring all the partitioning expressions have been converted to alias
-          // in Analyzer. Thus, we do not need to check if the expressions in conditions are
-          // the same as the expressions used in partitioning columns.
-          partitionAttrs.forall(_.isInstanceOf[Attribute])
+
+      val (candidates, containingNonDeterministic) =
+        splitConjunctivePredicates(condition).span(_.deterministic)
+
+      val (pushDown, rest) = candidates.partition { cond =>
+        cond.references.subsetOf(partitionAttrs)
       }
+
+      val stayUp = rest ++ containingNonDeterministic
+
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
         val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
@@ -1062,11 +1066,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
 
       // For each filter, expand the alias and check if the filter can be evaluated using
       // attributes produced by the aggregate operator's child operator.
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
+      val (candidates, containingNonDeterministic) =
+        splitConjunctivePredicates(condition).span(_.deterministic)
+
+      val (pushDown, rest) = candidates.partition { cond =>
         val replaced = replaceAlias(cond, aliasMap)
-        replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic
+        replaced.references.subsetOf(aggregate.child.outputSet)
       }
 
+      val stayUp = rest ++ containingNonDeterministic
+
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
         val replaced = replaceAlias(pushDownPredicate, aliasMap)
@@ -1080,9 +1089,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
 
     case filter @ Filter(condition, union: Union) =>
       // Union could change the rows, so non-deterministic predicate can't be pushed down
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
-        cond.deterministic
-      }
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)
+
       if (pushDown.nonEmpty) {
         val pushDownCond = pushDown.reduceLeft(And)
         val output = union.output
@@ -1133,9 +1141,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
     // come from grandchild.
     // TODO: non-deterministic predicates could be pushed through some operators that do not change
     // the rows.
-    val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond =>
-      cond.deterministic && cond.references.subsetOf(grandchild.outputSet)
+    val (candidates, containingNonDeterministic) =
+      splitConjunctivePredicates(filter.condition).span(_.deterministic)
+
+    val (pushDown, rest) = candidates.partition { cond =>
+      cond.references.subsetOf(grandchild.outputSet)
     }
+
+    val stayUp = rest ++ containingNonDeterministic
+
     if (pushDown.nonEmpty) {
       val newChild = insertFilter(pushDown.reduceLeft(And))
       if (stayUp.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7ffafa3b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index ea868d1..55836f9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -536,14 +536,14 @@ class FilterPushdownSuite extends PlanTest {
     val originalQuery = {
       testRelationWithArrayType
         .generate(Explode('c_arr), true, false, Some("arr"))
-        .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6))
+        .where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6))
     }
     val optimized = Optimize.execute(originalQuery.analyze)
     val correctAnswer = {
       testRelationWithArrayType
         .where('b >= 5)
         .generate(Explode('c_arr), true, false, Some("arr"))
-        .where('a + Rand(10).as("rnd") > 6)
+        .where('a + Rand(10).as("rnd") > 6 && 'c > 6)
         .analyze
     }
 
@@ -704,14 +704,14 @@ class FilterPushdownSuite extends PlanTest {
     val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)
 
     val originalQuery = Union(Seq(testRelation, testRelation2))
-      .where('a === 2L && 'b + Rand(10).as("rnd") === 3)
+      .where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L)
 
     val optimized = Optimize.execute(originalQuery.analyze)
 
     val correctAnswer = Union(Seq(
       testRelation.where('a === 2L),
       testRelation2.where('d === 2L)))
-      .where('b + Rand(10).as("rnd") === 3)
+      .where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
       .analyze
 
     comparePlans(optimized, correctAnswer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org