You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/31 04:06:01 UTC
spark git commit: [SPARK-23274][SQL] Fix ReplaceExceptWithFilter when
the right's Filter contains the references that are not in the left output
Repository: spark
Updated Branches:
refs/heads/master 778661673 -> ca04c3ff2
[SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filter contains the references that are not in the left output
## What changes were proposed in this pull request?
This PR is to fix the `ReplaceExceptWithFilter` rule when the right's Filter contains the references that are not in the left output.
Before this PR, we got the error like
```
java.util.NoSuchElementException: key not found: a
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
```
After this PR, `ReplaceExceptWithFilter ` will not take an effect in this case.
## How was this patch tested?
Added tests
Author: gatorsmile <ga...@gmail.com>
Closes #20444 from gatorsmile/fixReplaceExceptWithFilter.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca04c3ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca04c3ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca04c3ff
Branch: refs/heads/master
Commit: ca04c3ff2387bf0a4308a4b010154e6761827278
Parents: 7786616
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 30 20:05:57 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 30 20:05:57 2018 -0800
----------------------------------------------------------------------
.../optimizer/ReplaceExceptWithFilter.scala | 17 +++++++++++++----
.../catalyst/optimizer/ReplaceOperatorSuite.scala | 15 +++++++++++++++
.../org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++
3 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ca04c3ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 89bfcee..45edf26 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
}
plan.transform {
- case Except(left, right) if isEligible(left, right) =>
- Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
+ case e @ Except(left, right) if isEligible(left, right) =>
+ val newCondition = transformCondition(left, skipProject(right))
+ newCondition.map { c =>
+ Distinct(Filter(Not(c), left))
+ }.getOrElse {
+ e
+ }
}
}
- private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
+ private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = {
val filterCondition =
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap
- filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
+ if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) {
+ Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) })
+ } else {
+ None
+ }
}
// TODO: This can be further extended in the future.
http://git-wip-us.apache.org/repos/asf/spark/blob/ca04c3ff/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index e9701ff..52dc2e9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -168,6 +168,21 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}
+ test("replace Except with Filter when only right filter can be applied to the left") {
+ val table = LocalRelation(Seq('a.int, 'b.int))
+ val left = table.where('b < 1).select('a).as("left")
+ val right = table.where('b < 3).select('a).as("right")
+
+ val query = Except(left, right)
+ val optimized = Optimize.execute(query.analyze)
+
+ val correctAnswer =
+ Aggregate(left.output, right.output,
+ Join(left, right, LeftAnti, Option($"left.a" <=> $"right.a"))).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
test("replace Distinct with Aggregate") {
val input = LocalRelation('a.int, 'b.int)
http://git-wip-us.apache.org/repos/asf/spark/blob/ca04c3ff/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3370708..8b66f77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -589,6 +589,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
Nil)
}
+ test("SPARK-23274: except between two projects without references used in filter") {
+ val df = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
+ val df1 = df.filter($"a" === 1)
+ val df2 = df.filter($"a" === 2)
+ checkAnswer(df1.select("b").except(df2.select("b")), Row(3) :: Nil)
+ checkAnswer(df1.select("b").except(df2.select("c")), Row(2) :: Nil)
+ }
+
test("except distinct - SQL compliance") {
val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
val df_right = Seq(1, 3).toDF("id")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org