You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/11/04 20:18:20 UTC
spark git commit: [SPARK-17337][SQL] Do not pushdown predicates
through filters with predicate subqueries
Repository: spark
Updated Branches:
refs/heads/master a42d738c5 -> 550cd56e8
[SPARK-17337][SQL] Do not pushdown predicates through filters with predicate subqueries
## What changes were proposed in this pull request?
The `PushDownPredicate` rule can create a wrong result if we try to push a filter containing a predicate subquery through a project when the subquery and the project share attributes (have the same source).
The current PR fixes this by making sure that we do not push down when there is a predicate subquery that outputs the same attributes as the filters new child plan.
## How was this patch tested?
Added a test to `SubquerySuite`. nsyca has done previous work this. I have taken test from his initial PR.
Author: Herman van Hovell <hv...@databricks.com>
Closes #15761 from hvanhovell/SPARK-17337.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/550cd56e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/550cd56e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/550cd56e
Branch: refs/heads/master
Commit: 550cd56e8b6addb26efe3ce16976c9c34fa0c832
Parents: a42d738
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Nov 4 21:18:13 2016 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Nov 4 21:18:13 2016 +0100
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 16 ++++++++++++-
.../org/apache/spark/sql/SubquerySuite.scala | 24 ++++++++++++++++----
2 files changed, 35 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/550cd56e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b6ad5db..6ba8b33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -689,7 +689,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// state and all the input rows processed before. In another word, the order of input rows
// matters for non-deterministic expressions, while pushing down predicates changes the order.
case filter @ Filter(condition, project @ Project(fields, grandChild))
- if fields.forall(_.deterministic) =>
+ if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b).
@@ -830,6 +830,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}
}
+
+ /**
+ * Check if we can safely push a filter through a projection, by making sure that predicate
+ * subqueries in the condition do not contain the same attributes as the plan they are moved
+ * into. This can happen when the plan and predicate subquery have the same source.
+ */
+ private def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean = {
+ val attributes = plan.outputSet
+ val matched = condition.find {
+ case PredicateSubquery(p, _, _, _) => p.outputSet.intersect(attributes).nonEmpty
+ case _ => false
+ }
+ matched.isEmpty
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/550cd56e/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index eab4505..8934866 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -608,8 +608,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
| where exists (select 1 from onerow t2 where t1.c1=t2.c1)
| and exists (select 1 from onerow LIMIT 1)""".stripMargin),
Row(1) :: Nil)
- }
- }
+ }
+ }
test("SPARK-16804: Correlated subqueries containing LIMIT - 2") {
withTempView("onerow") {
@@ -623,6 +623,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
| from (select 1 from onerow t2 LIMIT 1)
| where t1.c1=t2.c1)""".stripMargin),
Row(1) :: Nil)
- }
- }
+ }
+ }
+
+ test("SPARK-17337: Incorrect column resolution leads to incorrect results") {
+ withTempView("t1", "t2") {
+ Seq(1, 2).toDF("c1").createOrReplaceTempView("t1")
+ Seq(1).toDF("c2").createOrReplaceTempView("t2")
+
+ checkAnswer(
+ sql(
+ """
+ | select *
+ | from (select t2.c2+1 as c3
+ | from t1 left join t2 on t1.c1=t2.c2) t3
+ | where c3 not in (select c2 from t2)""".stripMargin),
+ Row(2) :: Nil)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org