You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/02/25 13:43:56 UTC

spark git commit: [SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)

Repository: spark
Updated Branches:
  refs/heads/master 2e44031fa -> 3fa6491be


[SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)

## What changes were proposed in this pull request?

Predicates shouldn't be pushed through project with nondeterministic field(s).

See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details.

This PR targets master, branch-1.6, and branch-1.5.

## How was this patch tested?

A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case.

Author: Cheng Lian <li...@databricks.com>

Closes #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fa6491b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fa6491b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fa6491b

Branch: refs/heads/master
Commit: 3fa6491be66dad690ca5329dd32e7c82037ae8c1
Parents: 2e44031
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Feb 25 20:43:03 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 25 20:43:03 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  9 ++++++-
 .../optimizer/FilterPushdownSuite.scala         | 27 +++-----------------
 2 files changed, 11 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2b80497..2aeb957 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -792,7 +792,14 @@ object SimplifyFilters extends Rule[LogicalPlan] {
  */
 object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
+    // SPARK-13473: We can't push the predicate down when the underlying projection output non-
+    // deterministic field(s).  Non-deterministic expressions are essentially stateful. This
+    // implies that, for a given input row, the output are determined by the expression's initial
+    // state and all the input rows processed before. In another word, the order of input rows
+    // matters for non-deterministic expressions, while pushing down predicates changes the order.
+    case filter @ Filter(condition, project @ Project(fields, grandChild))
+      if fields.forall(_.deterministic) =>
+
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
       val aliasMap = AttributeMap(fields.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 7d60862..1292aa0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -98,7 +98,7 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("nondeterministic: can't push down filter through project") {
+  test("nondeterministic: can't push down filter with nondeterministic condition through project") {
     val originalQuery = testRelation
       .select(Rand(10).as('rand), 'a)
       .where('rand > 5 || 'a > 5)
@@ -109,36 +109,15 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, originalQuery)
   }
 
-  test("nondeterministic: push down part of filter through project") {
+  test("nondeterministic: can't push down filter through project with nondeterministic field") {
     val originalQuery = testRelation
       .select(Rand(10).as('rand), 'a)
-      .where('rand > 5 && 'a > 5)
-      .analyze
-
-    val optimized = Optimize.execute(originalQuery)
-
-    val correctAnswer = testRelation
       .where('a > 5)
-      .select(Rand(10).as('rand), 'a)
-      .where('rand > 5)
-      .analyze
-
-    comparePlans(optimized, correctAnswer)
-  }
-
-  test("nondeterministic: push down filter through project") {
-    val originalQuery = testRelation
-      .select(Rand(10).as('rand), 'a)
-      .where('a > 5 && 'a < 10)
       .analyze
 
     val optimized = Optimize.execute(originalQuery)
-    val correctAnswer = testRelation
-      .where('a > 5 && 'a < 10)
-      .select(Rand(10).as('rand), 'a)
-      .analyze
 
-    comparePlans(optimized, correctAnswer)
+    comparePlans(optimized, originalQuery)
   }
 
   test("filters: combines filters") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org