You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/08/25 12:19:04 UTC

spark git commit: [SPARK-16991][SPARK-17099][SPARK-17120][SQL] Fix Outer Join Elimination when Filter's isNotNull Constraints Unable to Filter Out All Null-supplying Rows

Repository: spark
Updated Branches:
  refs/heads/master 2b0cc4e0d -> d2ae6399e


[SPARK-16991][SPARK-17099][SPARK-17120][SQL] Fix Outer Join Elimination when Filter's isNotNull Constraints Unable to Filter Out All Null-supplying Rows

### What changes were proposed in this pull request?
This PR is to fix an incorrect outer join elimination when filter's `isNotNull` constraints is unable to filter out all null-supplying rows. For example, `isnotnull(coalesce(b#227, c#238))`.

Users can hit this error when they try to use `using/natural outer join`, which is converted to a normal outer join with a `coalesce` expression on the `using columns`. For example,
```Scala
    val a = Seq((1, 2), (2, 3)).toDF("a", "b")
    val b = Seq((2, 5), (3, 4)).toDF("a", "c")
    val c = Seq((3, 1)).toDF("a", "d")
    val ab = a.join(b, Seq("a"), "fullouter")
    ab.join(c, "a").explain(true)
```
The dataframe `ab` is doing `using full-outer join`, which is converted to a normal outer join with a `coalesce` expression. Constraints inference generates a `Filter` with constraints `isnotnull(coalesce(b#227, c#238))`. Then, it triggers a wrong outer join elimination and generates a wrong result.
```
Project [a#251, b#227, c#237, d#247]
+- Join Inner, (a#251 = a#246)
   :- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237]
   :  +- Join FullOuter, (a#226 = a#236)
   :     :- Project [_1#223 AS a#226, _2#224 AS b#227]
   :     :  +- LocalRelation [_1#223, _2#224]
   :     +- Project [_1#233 AS a#236, _2#234 AS c#237]
   :        +- LocalRelation [_1#233, _2#234]
   +- Project [_1#243 AS a#246, _2#244 AS d#247]
      +- LocalRelation [_1#243, _2#244]

== Optimized Logical Plan ==
Project [a#251, b#227, c#237, d#247]
+- Join Inner, (a#251 = a#246)
   :- Project [coalesce(a#226, a#236) AS a#251, b#227, c#237]
   :  +- Filter isnotnull(coalesce(a#226, a#236))
   :     +- Join FullOuter, (a#226 = a#236)
   :        :- LocalRelation [a#226, b#227]
   :        +- LocalRelation [a#236, c#237]
   +- LocalRelation [a#246, d#247]
```

**A note to the `Committer`**, please also give the credit to dongjoon-hyun who submitted another PR for fixing this issue. https://github.com/apache/spark/pull/14580

### How was this patch tested?
Added test cases

Author: gatorsmile <ga...@gmail.com>

Closes #14661 from gatorsmile/fixOuterJoinElimination.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2ae6399
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2ae6399
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2ae6399

Branch: refs/heads/master
Commit: d2ae6399ee2f0524b88262735adbbcb2035de8fd
Parents: 2b0cc4e
Author: gatorsmile <ga...@gmail.com>
Authored: Thu Aug 25 14:18:58 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Aug 25 14:18:58 2016 +0200

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 18 ++---
 .../optimizer/OuterJoinEliminationSuite.scala   | 39 +++++++++++
 .../resources/sql-tests/inputs/outer-join.sql   | 36 ++++++++++
 .../sql-tests/results/outer-join.sql.out        | 72 ++++++++++++++++++++
 .../apache/spark/sql/DataFrameJoinSuite.scala   |  8 +++
 5 files changed, 161 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d2ae6399/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9a0ff8a..82ad0fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1343,18 +1343,12 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
   }
 
   private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
-    val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
-    val leftConditions = splitConjunctiveConditions
-      .filter(_.references.subsetOf(join.left.outputSet))
-    val rightConditions = splitConjunctiveConditions
-      .filter(_.references.subsetOf(join.right.outputSet))
-
-    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
-      filter.constraints.filter(_.isInstanceOf[IsNotNull])
-        .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
-    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
-      filter.constraints.filter(_.isInstanceOf[IsNotNull])
-        .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
+    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
+    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
+    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
+
+    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
 
     join.joinType match {
       case RightOuter if leftHasNonNullPredicate => Inner

http://git-wip-us.apache.org/repos/asf/spark/blob/d2ae6399/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index 41754ad..c168a55 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -192,4 +193,42 @@ class OuterJoinEliminationSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("joins: no outer join elimination if the filter is not NULL eliminated") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+        .where(Coalesce("y.e".attr :: "x.a".attr :: Nil))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val left = testRelation
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, FullOuter, Option("a".attr === "d".attr))
+        .where(Coalesce("e".attr :: "a".attr :: Nil)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("joins: no outer join elimination if the filter's constraints are not NULL eliminated") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery =
+      x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
+        .where(IsNotNull(Coalesce("y.e".attr :: "x.a".attr :: Nil)))
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val left = testRelation
+    val right = testRelation1
+    val correctAnswer =
+      left.join(right, FullOuter, Option("a".attr === "d".attr))
+        .where(IsNotNull(Coalesce("e".attr :: "a".attr :: Nil))).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d2ae6399/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
new file mode 100644
index 0000000..f50f1eb
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
@@ -0,0 +1,36 @@
+-- SPARK-17099: Incorrect result when HAVING clause is added to group by query
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(-234), (145), (367), (975), (298)
+as t1(int_col1);
+
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
+(-769, -244), (-800, -409), (940, 86), (-507, 304), (-367, 158)
+as t2(int_col0, int_col1);
+
+SELECT
+  (SUM(COALESCE(t1.int_col1, t2.int_col0))),
+     ((COALESCE(t1.int_col1, t2.int_col0)) * 2)
+FROM t1
+RIGHT JOIN t2
+  ON (t2.int_col0) = (t1.int_col1)
+GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)),
+         COALESCE(t1.int_col1, t2.int_col0)
+HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0)))
+            > ((COALESCE(t1.int_col1, t2.int_col0)) * 2);
+
+
+-- SPARK-17120: Analyzer incorrectly optimizes plan to empty LocalRelation
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1);
+
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1);
+
+SELECT *
+FROM (
+SELECT
+    COALESCE(t2.int_col1, t1.int_col1) AS int_col
+    FROM t1
+    LEFT JOIN t2 ON false
+) t where (t.int_col) is not null;
+
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/d2ae6399/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
new file mode 100644
index 0000000..b39fdb0
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -0,0 +1,72 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 6
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
+(-234), (145), (367), (975), (298)
+as t1(int_col1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
+(-769, -244), (-800, -409), (940, 86), (-507, 304), (-367, 158)
+as t2(int_col0, int_col1)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT
+  (SUM(COALESCE(t1.int_col1, t2.int_col0))),
+     ((COALESCE(t1.int_col1, t2.int_col0)) * 2)
+FROM t1
+RIGHT JOIN t2
+  ON (t2.int_col0) = (t1.int_col1)
+GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)),
+         COALESCE(t1.int_col1, t2.int_col0)
+HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0)))
+            > ((COALESCE(t1.int_col1, t2.int_col0)) * 2)
+-- !query 2 schema
+struct<sum(coalesce(int_col1, int_col0)):bigint,(coalesce(int_col1, int_col0) * 2):int>
+-- !query 2 output
+-367	-734
+-507	-1014
+-769	-1538
+-800	-1600
+
+
+-- !query 3
+CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1)
+-- !query 3 schema
+struct<>
+-- !query 3 output
+
+
+
+-- !query 4
+CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1)
+-- !query 4 schema
+struct<>
+-- !query 4 output
+
+
+
+-- !query 5
+SELECT *
+FROM (
+SELECT
+    COALESCE(t2.int_col1, t1.int_col1) AS int_col
+    FROM t1
+    LEFT JOIN t2 ON false
+) t where (t.int_col) is not null
+-- !query 5 schema
+struct<int_col:int>
+-- !query 5 output
+97

http://git-wip-us.apache.org/repos/asf/spark/blob/d2ae6399/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4342c03..4abf5e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -225,4 +225,12 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
       Row(1, null) :: Row(null, 2) :: Nil
     )
   }
+
+  test("SPARK-16991: Full outer join followed by inner join produces wrong results") {
+    val a = Seq((1, 2), (2, 3)).toDF("a", "b")
+    val b = Seq((2, 5), (3, 4)).toDF("a", "c")
+    val c = Seq((3, 1)).toDF("a", "d")
+    val ab = a.join(b, Seq("a"), "fullouter")
+    checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org