You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2018/09/26 01:59:00 UTC

[jira] [Resolved] (SPARK-25511) Map with "null" key not working in spark 2.3

     [ https://issues.apache.org/jira/browse/SPARK-25511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun resolved SPARK-25511.
-----------------------------------
    Resolution: Invalid

Hi, [~ravi_b_shankar].

Historically, it's designed and documented like that since Apache Spark 1.4.0 and marked as a stable interface since 2.1.0.

- https://github.com/apache/spark/blob/v1.4.0/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala#L26

And, now we are releasing Spark 2.4. Given that, your request is invalid for general Spark users.

In addition, Spark cannot change the behavior, too. So, sorry, but I'll close this as `Invalid`.

> Map with "null" key not working in spark 2.3
> --------------------------------------------
>
>                 Key: SPARK-25511
>                 URL: https://issues.apache.org/jira/browse/SPARK-25511
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.3.1
>            Reporter: Ravi Shankar
>            Priority: Major
>
> I had a use case where i was creating a histogram of column values through a UDAF in a Map data type. It is basically just a group by count on a column's value that is returned as a Map<ColumnDataType, LongType>. I needed to plugin all invalid values for the column as a "null -> count" in the map that was returned. In 2.1.x, this was working fine and i could create a Map with "null" being a key. This is not working in 2.3 and wondering if this is expected and if i have to change my application code: 
>  
> {code:java}
> val myList = List(("a", "1"), ("b", "2"), ("a", "3"), (null, "4"))
> val map = myList.toMap
> val data = List(List("sublime", map))
> val rdd = sc.parallelize(data).map(l ⇒ Row.fromSeq(l.toSeq))
> val datasetSchema = StructType(List(StructField("name", StringType, true), StructField("songs", MapType(StringType, StringType, true), true)))
> val df = spark.createDataFrame(rdd, datasetSchema)
> df.take(5).foreach(println)
> {code}
> Output in spark 2.1.x:
> {code:java}
> scala> df.take(5).foreach(println)
> [sublime,Map(a -> 3, b -> 2, null -> 4)]
> {code}
> Output in spark 2.3.x:
> {code:java}
> 2018-09-21 15:35:25 ERROR Executor:91 - Exception in task 2.0 in stage 14.0 (TID 39)
> java.lang.RuntimeException: Error while encoding: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS songs#39
> 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
> 	at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
> 	at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:109)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
> 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
> 	... 23 more
> 2018-09-21 15:35:25 WARN  TaskSetManager:66 - Lost task 2.0 in stage 14.0 (TID 39, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS songs#39
> 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
> 	at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
> 	at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:109)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
> 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
> 	... 23 more
> 2018-09-21 15:35:25 ERROR TaskSetManager:70 - Task 2 in stage 14.0 failed 1 times; aborting job
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 14.0 failed 1 times, most recent failure: Lost task 2.0 in stage 14.0 (TID 39, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS songs#39
> 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
> 	at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
> 	at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:109)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
> 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
> 	... 23 more
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>   at scala.Option.foreach(Option.scala:257)
>   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
>   at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
>   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
>   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
>   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
>   ... 55 elided
> Caused by: java.lang.RuntimeException: Error while encoding: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, name), StringType), true, false) AS name#38
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS songs#39
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
>   at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>   at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
>   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
>   at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
> If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects$(Unknown Source)
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If$(Unknown Source)
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
>   ... 23 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org