You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/04/19 07:52:54 UTC

spark git commit: [SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin optimization that can lead to NPE

Repository: spark
Updated Branches:
  refs/heads/master 702d85af2 -> 608bf30f0


[SPARK-20359][SQL] Avoid unnecessary execution in EliminateOuterJoin optimization that can lead to NPE

Avoid necessary execution that can lead to NPE in EliminateOuterJoin and add test in DataFrameSuite to confirm NPE is no longer thrown

## What changes were proposed in this pull request?
Change leftHasNonNullPredicate and rightHasNonNullPredicate to lazy so they are only executed when needed.

## How was this patch tested?

Added test in DataFrameSuite that failed before this fix and now succeeds. Note that a test in catalyst project would be better but i am unsure how to do this.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Koert Kuipers <ko...@tresata.com>

Closes #17660 from koertkuipers/feat-catch-npe-in-eliminate-outer-join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/608bf30f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/608bf30f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/608bf30f

Branch: refs/heads/master
Commit: 608bf30f0b9759fd0b9b9f33766295550996a9eb
Parents: 702d85a
Author: Koert Kuipers <ko...@tresata.com>
Authored: Wed Apr 19 15:52:47 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Apr 19 15:52:47 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/optimizer/joins.scala   |  4 ++--
 .../test/scala/org/apache/spark/sql/DataFrameSuite.scala  | 10 ++++++++++
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/608bf30f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index c3ab587..2fe3039 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -134,8 +134,8 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred
     val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
     val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
 
-    val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
-    val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
+    lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
+    lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
 
     join.joinType match {
       case RightOuter if leftHasNonNullPredicate => Inner

http://git-wip-us.apache.org/repos/asf/spark/blob/608bf30f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 52bd4e1..b4893b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1722,4 +1722,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         "Cannot have map type columns in DataFrame which calls set operations"))
     }
   }
+
+  test("SPARK-20359: catalyst outer join optimization should not throw npe") {
+    val df1 = Seq("a", "b", "c").toDF("x")
+      .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" }.apply($"x"))
+    val df2 = Seq("a", "b").toDF("x1")
+    df1
+      .join(df2, df1("x") === df2("x1"), "left_outer")
+      .filter($"x1".isNotNull || !$"y".isin("a!"))
+      .count
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org