You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/02/25 13:43:56 UTC
spark git commit: [SPARK-13473][SQL] Don't push predicate through
project with nondeterministic field(s)
Repository: spark
Updated Branches:
refs/heads/master 2e44031fa -> 3fa6491be
[SPARK-13473][SQL] Don't push predicate through project with nondeterministic field(s)
## What changes were proposed in this pull request?
Predicates shouldn't be pushed through project with nondeterministic field(s).
See https://github.com/graphframes/graphframes/pull/23 and SPARK-13473 for more details.
This PR targets master, branch-1.6, and branch-1.5.
## How was this patch tested?
A test case is added in `FilterPushdownSuite`. It constructs a query plan where a filter is over a project with a nondeterministic field. Optimized query plan shouldn't change in this case.
Author: Cheng Lian <li...@databricks.com>
Closes #11348 from liancheng/spark-13473-no-ppd-through-nondeterministic-project-field.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fa6491b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fa6491b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fa6491b
Branch: refs/heads/master
Commit: 3fa6491be66dad690ca5329dd32e7c82037ae8c1
Parents: 2e44031
Author: Cheng Lian <li...@databricks.com>
Authored: Thu Feb 25 20:43:03 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 25 20:43:03 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 9 ++++++-
.../optimizer/FilterPushdownSuite.scala | 27 +++-----------------
2 files changed, 11 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2b80497..2aeb957 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -792,7 +792,14 @@ object SimplifyFilters extends Rule[LogicalPlan] {
*/
object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case filter @ Filter(condition, project @ Project(fields, grandChild)) =>
+ // SPARK-13473: We can't push the predicate down when the underlying projection output non-
+ // deterministic field(s). Non-deterministic expressions are essentially stateful. This
+ // implies that, for a given input row, the output are determined by the expression's initial
+ // state and all the input rows processed before. In another word, the order of input rows
+ // matters for non-deterministic expressions, while pushing down predicates changes the order.
+ case filter @ Filter(condition, project @ Project(fields, grandChild))
+ if fields.forall(_.deterministic) =>
+
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
val aliasMap = AttributeMap(fields.collect {
http://git-wip-us.apache.org/repos/asf/spark/blob/3fa6491b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 7d60862..1292aa0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -98,7 +98,7 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
- test("nondeterministic: can't push down filter through project") {
+ test("nondeterministic: can't push down filter with nondeterministic condition through project") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
.where('rand > 5 || 'a > 5)
@@ -109,36 +109,15 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, originalQuery)
}
- test("nondeterministic: push down part of filter through project") {
+ test("nondeterministic: can't push down filter through project with nondeterministic field") {
val originalQuery = testRelation
.select(Rand(10).as('rand), 'a)
- .where('rand > 5 && 'a > 5)
- .analyze
-
- val optimized = Optimize.execute(originalQuery)
-
- val correctAnswer = testRelation
.where('a > 5)
- .select(Rand(10).as('rand), 'a)
- .where('rand > 5)
- .analyze
-
- comparePlans(optimized, correctAnswer)
- }
-
- test("nondeterministic: push down filter through project") {
- val originalQuery = testRelation
- .select(Rand(10).as('rand), 'a)
- .where('a > 5 && 'a < 10)
.analyze
val optimized = Optimize.execute(originalQuery)
- val correctAnswer = testRelation
- .where('a > 5 && 'a < 10)
- .select(Rand(10).as('rand), 'a)
- .analyze
- comparePlans(optimized, correctAnswer)
+ comparePlans(optimized, originalQuery)
}
test("filters: combines filters") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org