You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Matthew Fishkin (JIRA)" <ji...@apache.org> on 2018/01/02 21:00:00 UTC

[jira] [Updated] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

     [ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthew Fishkin updated SPARK-22942:
------------------------------------
    Description: 
I ran into an interesting issue when trying to do a `filter` on a dataframe that has columns that were added using a UDF. I am able to replicate the problem with a smaller set of data.

Given the dummy case classes:

{code:java}
case class Info(number: Int, color: String)
case class Record(name: String, infos: Seq[Info])
{code}

And the following data:
{code:java}
val blue = Info(1, "blue")
val black = Info(2, "black")
val yellow = Info(3, "yellow")
val orange = Info(4, "orange")
val white = Info(5, "white")

val a  = Record("a", Seq(blue, black, white))
val a2 = Record("a", Seq(yellow, white, orange))
val b = Record("b", Seq(blue, black))
val c = Record("c", Seq(white, orange))
 val d = Record("d", Seq(orange, black))
{code}

Create two dataframes (we will call them left and right)
{code:java}
val left = Seq(a, b).toDF
val right = Seq(a2, c, d).toDF
{code}

Join those two dataframes with an outer join (So two of our columns are nullable now.
{code:java}
val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
joined.show(false)

res0:
+----+--------------------------------+-----------------------------------+
|name|infos                           |infos                              |
+----+--------------------------------+-----------------------------------+
|b   |[[1,blue], [2,black]]           |null                               |
|a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
|c   |null                            |[[5,white], [4,orange]]            |
|d   |null                            |[[4,orange], [2,black]]            |
+----+--------------------------------+-----------------------------------+
{code}

Then, take only the `name`s that exist in the right Dataframe
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))

rightOnly.show(false)

res1:
+----+-----------------------+
|name|r_infos                |
+----+-----------------------+
|c   |[[5,white], [4,orange]]|
|d   |[[4,orange], [2,black]]|
+----+-----------------------+
{code}

Now, add a new column called `has_black` which will be true if the `r_infos` contains _black_ as a color

{code:java}
def hasBlack = (s: Seq[Row]) => {
  s.exists{ case Row(num: Int, color: String) =>
    color == "black"
  }
}

val rightBreakdown = rightOnlyInfos.withColumn("has_black", udf(hasBlack).apply($"r_infos"))

rightBreakdown.show(false)

res2:
+----+-----------------------+---------+
|name|r_infos                |has_black|
+----+-----------------------+---------+
|c   |[[5,white], [4,orange]]|false    |
|d   |[[4,orange], [2,black]]|true     |
+----+-----------------------+---------+
{code}

So far, *exactly* what we expected. However, when I try to filter `rightBreakdown`, it fails.
{code:java}
rightBreakdown.filter("has_black == true").show(false)

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => boolean)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
  at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
  at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
  at scala.collection.immutable.List.exists(List.scala:84)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
  ... 58 elided
Caused by: java.lang.NullPointerException
  at $anonfun$hasBlack$1.apply(<console>:41)
  at $anonfun$hasBlack$1.apply(<console>:40)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
  ... 114 more
{code}

  was:
I ran into an interesting issue when trying to do a `filter` on a dataframe that has columns that were added using a UDF. I am able to replicate the problem with a smaller set of data.

Given the dummy case classes:

{code:scala}
case class Info(number: Int, color: String)
case class Record(name: String, infos: Seq[Info])
{code}

And the following data:
{code:scala}
val blue = Info(1, "blue")
val black = Info(2, "black")
val yellow = Info(3, "yellow")
val orange = Info(4, "orange")
val white = Info(5, "white")

val a  = Record("a", Seq(blue, black, white))
val a2 = Record("a", Seq(yellow, white, orange))
val b = Record("b", Seq(blue, black))
val c = Record("c", Seq(white, orange))
 val d = Record("d", Seq(orange, black))
{code}

Create two dataframes (we will call them left and right)
{code:scala}
val left = Seq(a, b).toDF
val right = Seq(a2, c, d).toDF
{code}

Join those two dataframes with an outer join (So two of our columns are nullable now.
{code:scala}
val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
joined.show(false)

res0:
+----+--------------------------------+-----------------------------------+
|name|infos                           |infos                              |
+----+--------------------------------+-----------------------------------+
|b   |[[1,blue], [2,black]]           |null                               |
|a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
|c   |null                            |[[5,white], [4,orange]]            |
|d   |null                            |[[4,orange], [2,black]]            |
+----+--------------------------------+-----------------------------------+
{code}

Then, take only the `name`s that exist in the right Dataframe
{code:scala}
val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))

rightOnly.show(false)

res1:
+----+-----------------------+
|name|r_infos                |
+----+-----------------------+
|c   |[[5,white], [4,orange]]|
|d   |[[4,orange], [2,black]]|
+----+-----------------------+
{code}

Now, add a new column called `has_black` which will be true if the `r_infos` contains _black_ as a color

{code:scala}
def hasBlack = (s: Seq[Row]) => {
  s.exists{ case Row(num: Int, color: String) =>
    color == "black"
  }
}

val rightBreakdown = rightOnlyInfos.withColumn("has_black", udf(hasBlack).apply($"r_infos"))

rightBreakdown.show(false)

res2:
+----+-----------------------+---------+
|name|r_infos                |has_black|
+----+-----------------------+---------+
|c   |[[5,white], [4,orange]]|false    |
|d   |[[4,orange], [2,black]]|true     |
+----+-----------------------+---------+
{code}

So far, *exactly* what we expected. However, when I try to filter `rightBreakdown`, it fails.
{code:scala}
rightBreakdown.filter("has_black == true").show(false)

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => boolean)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
  at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
  at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
  at scala.collection.immutable.List.exists(List.scala:84)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
  at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
  ... 58 elided
Caused by: java.lang.NullPointerException
  at $anonfun$hasBlack$1.apply(<console>:41)
  at $anonfun$hasBlack$1.apply(<console>:40)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
  ... 114 more
{code}


> Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-22942
>                 URL: https://issues.apache.org/jira/browse/SPARK-22942
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, SQL
>    Affects Versions: 2.2.0
>            Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe that has columns that were added using a UDF. I am able to replicate the problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +----+--------------------------------+-----------------------------------+
> |name|infos                           |infos                              |
> +----+--------------------------------+-----------------------------------+
> |b   |[[1,blue], [2,black]]           |null                               |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null                            |[[5,white], [4,orange]]            |
> |d   |null                            |[[4,orange], [2,black]]            |
> +----+--------------------------------+-----------------------------------+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> +----+-----------------------+
> |name|r_infos                |
> +----+-----------------------+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> +----+-----------------------+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
>     color == "black"
>   }
> }
> val rightBreakdown = rightOnlyInfos.withColumn("has_black", udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> +----+-----------------------+---------+
> |name|r_infos                |has_black|
> +----+-----------------------+---------+
> |c   |[[5,white], [4,orange]]|false    |
> |d   |[[4,orange], [2,black]]|true     |
> +----+-----------------------+---------+
> {code}
> So far, *exactly* what we expected. However, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => boolean)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
>   ... 58 elided
> Caused by: java.lang.NullPointerException
>   at $anonfun$hasBlack$1.apply(<console>:41)
>   at $anonfun$hasBlack$1.apply(<console>:40)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
>   ... 114 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org