You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/31 04:06:01 UTC

spark git commit: [SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filter contains the references that are not in the left output

Repository: spark
Updated Branches:
  refs/heads/master 778661673 -> ca04c3ff2


[SPARK-23274][SQL] Fix ReplaceExceptWithFilter when the right's Filter contains the references that are not in the left output

## What changes were proposed in this pull request?
This PR is to fix the `ReplaceExceptWithFilter` rule when the right's Filter contains the references that are not in the left output.

Before this PR, we got the error like
```
java.util.NoSuchElementException: key not found: a
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
```

After this PR, `ReplaceExceptWithFilter ` will not take an effect in this case.

## How was this patch tested?
Added tests

Author: gatorsmile <ga...@gmail.com>

Closes #20444 from gatorsmile/fixReplaceExceptWithFilter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca04c3ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca04c3ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca04c3ff

Branch: refs/heads/master
Commit: ca04c3ff2387bf0a4308a4b010154e6761827278
Parents: 7786616
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 30 20:05:57 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 30 20:05:57 2018 -0800

----------------------------------------------------------------------
 .../optimizer/ReplaceExceptWithFilter.scala        | 17 +++++++++++++----
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 15 +++++++++++++++
 .../org/apache/spark/sql/DataFrameSuite.scala      |  8 ++++++++
 3 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca04c3ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 89bfcee..45edf26 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
     }
 
     plan.transform {
-      case Except(left, right) if isEligible(left, right) =>
-        Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
+      case e @ Except(left, right) if isEligible(left, right) =>
+        val newCondition = transformCondition(left, skipProject(right))
+        newCondition.map { c =>
+          Distinct(Filter(Not(c), left))
+        }.getOrElse {
+          e
+        }
     }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
+  private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = {
     val filterCondition =
       InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
 
     val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap
 
-    filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
+    if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) {
+      Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) })
+    } else {
+      None
+    }
   }
 
   // TODO: This can be further extended in the future.

http://git-wip-us.apache.org/repos/asf/spark/blob/ca04c3ff/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index e9701ff..52dc2e9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -168,6 +168,21 @@ class ReplaceOperatorSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("replace Except with Filter when only right filter can be applied to the left") {
+    val table = LocalRelation(Seq('a.int, 'b.int))
+    val left = table.where('b < 1).select('a).as("left")
+    val right = table.where('b < 3).select('a).as("right")
+
+    val query = Except(left, right)
+    val optimized = Optimize.execute(query.analyze)
+
+    val correctAnswer =
+      Aggregate(left.output, right.output,
+        Join(left, right, LeftAnti, Option($"left.a" <=> $"right.a"))).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("replace Distinct with Aggregate") {
     val input = LocalRelation('a.int, 'b.int)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca04c3ff/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 3370708..8b66f77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -589,6 +589,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       Nil)
   }
 
+  test("SPARK-23274: except between two projects without references used in filter") {
+    val df = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", "c")
+    val df1 = df.filter($"a" === 1)
+    val df2 = df.filter($"a" === 2)
+    checkAnswer(df1.select("b").except(df2.select("b")), Row(3) :: Nil)
+    checkAnswer(df1.select("b").except(df2.select("c")), Row(2) :: Nil)
+  }
+
   test("except distinct - SQL compliance") {
     val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
     val df_right = Seq(1, 3).toDF("id")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org