You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2021/03/24 17:25:00 UTC

[jira] [Assigned] (SPARK-34829) transform_values return identical values when it's used with udf that returns reference type

     [ https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-34829:
------------------------------------

    Assignee: Apache Spark

> transform_values return identical values when it's used with udf that returns reference type
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-34829
>                 URL: https://issues.apache.org/jira/browse/SPARK-34829
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Pavel Chernikov
>            Assignee: Apache Spark
>            Priority: Major
>
> If return value of an {{udf}} that is passed to {{transform_values}} is an {{AnyRef}}, then the transformation returns identical new values for each map key (to be more precise, each newly obtained value overrides values for all previously processed keys).
> Consider following examples:
> {code:java}
> case class Bar(i: Int)
> val square = udf((b: Bar) => b.i * b.i)
> val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
> df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
> +------------------------------+------------------------+
> |map                           |map_square              |
> +------------------------------+------------------------+
> |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
> +------------------------------+------------------------+
> {code}
> vs 
> {code:java}
> case class Bar(i: Int)
> case class BarSquare(i: Int)
> val square = udf((b: Bar) => BarSquare(b.i * b.i))
> val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
> df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
> +------------------------------+------------------------------+
> |map                           |map_square                    |
> +------------------------------+------------------------------+
> |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
> +------------------------------+------------------------------+
> {code}
> or even just this one
> {code:java}
> case class Foo(s: String)
> val reverse = udf((f: Foo) => f.s.reverse)
> val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map")
> df.withColumn("map_reverse", transform_values(col("map"), (_, v) => reverse(v))).show(truncate = false)
> +------------------------------------+------------------------------+
> |map                                 |map_reverse                   |
> +------------------------------------+------------------------------+
> |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}|
> +------------------------------------+------------------------------+
> {code}
> After playing with {{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like something wrong is happening while executing this line:
> {code:java}
> resultValues.update(i, functionForEval.eval(inputRow)){code}
> To be more precise , it's all about {{functionForEval.eval(inputRow)}} , because if you do something like this:
> {code:java}
> println(s"RESULTS PRIOR TO EVALUATION - $resultValues")
> val resultValue = functionForEval.eval(inputRow)
> println(s"RESULT - $resultValue")
> println(s"RESULTS PRIOR TO UPDATE - $resultValues")
> resultValues.update(i, resultValue)
> println(s"RESULTS AFTER UPDATE - $resultValues"){code}
> You'll see in the logs, something like:
> {code:java}
> RESULTS PRIOR TO EVALUATION - [null,null,null] 
> RESULT - [0,1] 
> RESULTS PRIOR TO UPDATE - [null,null,null]
> RESULTS AFTER UPDATE - [[0,1],null,null]
> ------
> RESULTS PRIOR TO EVALUATION - [[0,1],null,null] 
> RESULT - [0,4]
> RESULTS PRIOR TO UPDATE - [[0,4],null,null] 
> RESULTS  AFTER UPDATE - [[0,4],[0,4],null]
> ------
> RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] 
> RESULT - [0,9]
> RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null]
> RESULTS  AFTER UPDATE - [[0,9],[0,9],[0,9]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org