You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Matthew Fishkin (JIRA)" <ji...@apache.org> on 2018/01/02 23:26:00 UTC

[jira] [Comment Edited] (SPARK-22942) Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF

    [ https://issues.apache.org/jira/browse/SPARK-22942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308882#comment-16308882 ] 

Matthew Fishkin edited comment on SPARK-22942 at 1/2/18 11:25 PM:
------------------------------------------------------------------

I would expect that to work too. I'm more curious why the null pointer is occurring when none of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
{code}
to
{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one column of an outer join is null, the other must be defined.


was (Author: mjfish93):
I would expect that to work too. I'm more curious why the null pointer is occurring when none of the data is null.

Interestingly, I found the following. When you change from 
{code:java}
val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
{code}

{code:java}
val rightOnly = joined.filter("l.infos is null and r.infos is not null").select($"name", $"r.infos".as("r_infos"))
{code}

the rest of the code above works. 

But I am pretty sure 
"l.infos is null and r.infos is not null" is equal to "l.infos is null". If one column of an outer join is null, the other must be defined.

> Spark Sql UDF throwing NullPointer when adding a filter on a columns that uses that UDF
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-22942
>                 URL: https://issues.apache.org/jira/browse/SPARK-22942
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, SQL
>    Affects Versions: 2.2.0
>            Reporter: Matthew Fishkin
>
> I ran into an interesting issue when trying to do a `filter` on a dataframe that has columns that were added using a UDF. I am able to replicate the problem with a smaller set of data.
> Given the dummy case classes:
> {code:java}
> case class Info(number: Int, color: String)
> case class Record(name: String, infos: Seq[Info])
> {code}
> And the following data:
> {code:java}
> val blue = Info(1, "blue")
> val black = Info(2, "black")
> val yellow = Info(3, "yellow")
> val orange = Info(4, "orange")
> val white = Info(5, "white")
> val a  = Record("a", Seq(blue, black, white))
> val a2 = Record("a", Seq(yellow, white, orange))
> val b = Record("b", Seq(blue, black))
> val c = Record("c", Seq(white, orange))
>  val d = Record("d", Seq(orange, black))
> {code}
> Create two dataframes (we will call them left and right)
> {code:java}
> val left = Seq(a, b).toDF
> val right = Seq(a2, c, d).toDF
> {code}
> Join those two dataframes with an outer join (So two of our columns are nullable now.
> {code:java}
> val joined = left.alias("l").join(right.alias("r"), Seq("name"), "full_outer")
> joined.show(false)
> res0:
> +----+--------------------------------+-----------------------------------+
> |name|infos                           |infos                              |
> +----+--------------------------------+-----------------------------------+
> |b   |[[1,blue], [2,black]]           |null                               |
> |a   |[[1,blue], [2,black], [5,white]]|[[3,yellow], [5,white], [4,orange]]|
> |c   |null                            |[[5,white], [4,orange]]            |
> |d   |null                            |[[4,orange], [2,black]]            |
> +----+--------------------------------+-----------------------------------+
> {code}
> Then, take only the `name`s that exist in the right Dataframe
> {code:java}
> val rightOnly = joined.filter("l.infos is null").select($"name", $"r.infos".as("r_infos"))
> rightOnly.show(false)
> res1:
> +----+-----------------------+
> |name|r_infos                |
> +----+-----------------------+
> |c   |[[5,white], [4,orange]]|
> |d   |[[4,orange], [2,black]]|
> +----+-----------------------+
> {code}
> Now, add a new column called `has_black` which will be true if the `r_infos` contains _black_ as a color
> {code:java}
> def hasBlack = (s: Seq[Row]) => {
>   s.exists{ case Row(num: Int, color: String) =>
>     color == "black"
>   }
> }
> val rightBreakdown = rightOnly.withColumn("has_black", udf(hasBlack).apply($"r_infos"))
> rightBreakdown.show(false)
> res2:
> +----+-----------------------+---------+
> |name|r_infos                |has_black|
> +----+-----------------------+---------+
> |c   |[[5,white], [4,orange]]|false    |
> |d   |[[4,orange], [2,black]]|true     |
> +----+-----------------------+---------+
> {code}
> So far, _exactly_ what we expected. 
> *However*, when I try to filter `rightBreakdown`, it fails.
> {code:java}
> rightBreakdown.filter("has_black == true").show(false)
> org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => boolean)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075)
>   at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138)
>   at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
>   at scala.collection.immutable.List.exists(List.scala:84)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150)
>   at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
>   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:646)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:623)
>   ... 58 elided
> Caused by: java.lang.NullPointerException
>   at $anonfun$hasBlack$1.apply(<console>:41)
>   at $anonfun$hasBlack$1.apply(<console>:40)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91)
>   at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
>   ... 114 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org