You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2021/03/24 17:25:00 UTC
[jira] [Assigned] (SPARK-34829) transform_values return identical
values when it's used with udf that returns reference type
[ https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-34829:
------------------------------------
Assignee: Apache Spark
> transform_values return identical values when it's used with udf that returns reference type
> --------------------------------------------------------------------------------------------
>
> Key: SPARK-34829
> URL: https://issues.apache.org/jira/browse/SPARK-34829
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.1
> Reporter: Pavel Chernikov
> Assignee: Apache Spark
> Priority: Major
>
> If return value of an {{udf}} that is passed to {{transform_values}} is an {{AnyRef}}, then the transformation returns identical new values for each map key (to be more precise, each newly obtained value overrides values for all previously processed keys).
> Consider following examples:
> {code:java}
> case class Bar(i: Int)
> val square = udf((b: Bar) => b.i * b.i)
> val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
> df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
> +------------------------------+------------------------+
> |map |map_square |
> +------------------------------+------------------------+
> |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}|
> +------------------------------+------------------------+
> {code}
> vs
> {code:java}
> case class Bar(i: Int)
> case class BarSquare(i: Int)
> val square = udf((b: Bar) => BarSquare(b.i * b.i))
> val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map")
> df.withColumn("map_square", transform_values(col("map"), (_, v) => square(v))).show(truncate = false)
> +------------------------------+------------------------------+
> |map |map_square |
> +------------------------------+------------------------------+
> |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}|
> +------------------------------+------------------------------+
> {code}
> or even just this one
> {code:java}
> case class Foo(s: String)
> val reverse = udf((f: Foo) => f.s.reverse)
> val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> Foo("xyz"))).toDF("map")
> df.withColumn("map_reverse", transform_values(col("map"), (_, v) => reverse(v))).show(truncate = false)
> +------------------------------------+------------------------------+
> |map |map_reverse |
> +------------------------------------+------------------------------+
> |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}|
> +------------------------------------+------------------------------+
> {code}
> After playing with {{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like something wrong is happening while executing this line:
> {code:java}
> resultValues.update(i, functionForEval.eval(inputRow)){code}
> To be more precise , it's all about {{functionForEval.eval(inputRow)}} , because if you do something like this:
> {code:java}
> println(s"RESULTS PRIOR TO EVALUATION - $resultValues")
> val resultValue = functionForEval.eval(inputRow)
> println(s"RESULT - $resultValue")
> println(s"RESULTS PRIOR TO UPDATE - $resultValues")
> resultValues.update(i, resultValue)
> println(s"RESULTS AFTER UPDATE - $resultValues"){code}
> You'll see in the logs, something like:
> {code:java}
> RESULTS PRIOR TO EVALUATION - [null,null,null]
> RESULT - [0,1]
> RESULTS PRIOR TO UPDATE - [null,null,null]
> RESULTS AFTER UPDATE - [[0,1],null,null]
> ------
> RESULTS PRIOR TO EVALUATION - [[0,1],null,null]
> RESULT - [0,4]
> RESULTS PRIOR TO UPDATE - [[0,4],null,null]
> RESULTS AFTER UPDATE - [[0,4],[0,4],null]
> ------
> RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null]
> RESULT - [0,9]
> RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null]
> RESULTS AFTER UPDATE - [[0,9],[0,9],[0,9]
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org