You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/07 05:15:28 UTC
spark git commit: [SPARK-21835][SQL][FOLLOW-UP]
RewritePredicateSubquery should not produce unresolved query plans
Repository: spark
Updated Branches:
refs/heads/master aad212547 -> ce7293c15
[SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans
## What changes were proposed in this pull request?
This is a follow-up of #19050 to deal with `ExistenceJoin` case.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #19151 from viirya/SPARK-21835-followup.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce7293c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce7293c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce7293c1
Branch: refs/heads/master
Commit: ce7293c150c71a872d20beda44b12dec9deca18d
Parents: aad2125
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Sep 6 22:15:25 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 6 22:15:25 2017 -0700
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/optimizer/subquery.scala | 11 +++++++----
.../test/scala/org/apache/spark/sql/SubquerySuite.scala | 12 ++++++++++++
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ce7293c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 7ff8915..64b2856 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
}
}
- private def dedupJoin(joinPlan: Join): Join = joinPlan match {
+ private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
// the produced join then becomes unresolved and break structural integrity. We should
// de-duplicate conflicting attributes. We don't use transformation here because we only
// care about the most top join converted from correlated predicate subquery.
- case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
+ case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
val duplicates = right.outputSet.intersect(left.outputSet)
if (duplicates.nonEmpty) {
val aliasMap = AttributeMap(duplicates.map { dup =>
@@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
e transformUp {
case Exists(sub, conditions, _) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
- newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
+ // Deduplicate conflicting attributes if any.
+ newPlan = dedupJoin(
+ Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
exists
case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
- newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
+ // Deduplicate conflicting attributes if any.
+ newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
exists
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ce7293c1/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index ee6905e..8673dc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}
}
}
+
+ test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") {
+ val sqlText =
+ """
+ |SELECT * FROM l, r WHERE l.a = r.c + 1 AND
+ |(EXISTS (SELECT * FROM r) OR l.a = r.c)
+ """.stripMargin
+ val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
+ val join = optimizedPlan.collectFirst { case j: Join => j }.get
+ assert(join.duplicateResolved)
+ assert(optimizedPlan.resolved)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org