You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "L. C. Hsieh (Jira)" <ji...@apache.org> on 2021/05/12 00:41:00 UTC

[jira] [Commented] (SPARK-35371) Scala UDF returning string or complex type applied to array members returns wrong data

    [ https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342936#comment-17342936 ] 

L. C. Hsieh commented on SPARK-35371:
-------------------------------------

I just ran the example in both current master branch, and branch-3.1. Both got the correct results. Would you like to test it too?

> Scala UDF returning string or complex type applied to array members returns wrong data
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-35371
>                 URL: https://issues.apache.org/jira/browse/SPARK-35371
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: David Benedeki
>            Priority: Major
>
> When using an UDF returning string or complex type (Struct) on array members the resulting array consists of the last array member UDF result.
> h3. *Example code:*
> {code:scala}
> import org.apache.spark.sql.{Column, SparkSession}
> import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
> val sparkBuilder: SparkSession.Builder = SparkSession.builder()
>   .master("local[*]")
>   .appName(s"Udf Bug Demo")
>   .config("spark.ui.enabled", "false")
>   .config("spark.debug.maxToStringFields", 100)
> val spark: SparkSession = sparkBuilder
>   .config("spark.driver.bindAddress", "127.0.0.1")
>   .config("spark.driver.host", "127.0.0.1")
>   .getOrCreate()
> import spark.implicits._
> case class Foo(num: Int, s: String)
> val src  = Seq(
>   (1, 2, Array(1, 2, 3)),
>   (2, 2, Array(2, 2, 2)),
>   (3, 4, Array(3, 4, 3, 4))
> ).toDF("A", "B", "C")
> val udfStringName = "UdfString"
> val udfIntName = "UdfInt"
> val udfStructName = "UdfStruct"
> val udfString = udf((num: Int) => {
>   (num + 1).toString
> })
> spark.udf.register(udfStringName, udfString)
> val udfInt = udf((num: Int) => {
>   num + 1
> })
> spark.udf.register(udfIntName, udfInt)
> val udfStruct = udf((num: Int) => {
>   Foo(num + 1, (num + 1).toString)
> })
> spark.udf.register(udfStructName, udfStruct)
> val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
> val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
> val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
> val cA = callUDF(udfStringName, col("A"))
> val cB = callUDF(udfStringName, col("B"))
> val cCString: Column = transform(col("C"), lambdaString)
> val cCInt: Column = transform(col("C"), lambdaInt)
> val cCStruc: Column = transform(col("C"), lambdaStruct)
> val dest = src.withColumn("AStr", cA)
>   .withColumn("BStr", cB)
>   .withColumn("CString (Wrong)", cCString)
>   .withColumn("CInt (OK)", cCInt)
>   .withColumn("CStruct (Wrong)", cCStruc)
> dest.show(false)
> dest.printSchema()
> {code}
> h3. *Expected:*
> {noformat}
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |A  |B  |C           |AStr|BStr|CString        |CInt        |CStruct                      |
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]      |[2, 3, 4]   |[{2, 2}, {3, 3}, {4, 4}]        |
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, {3, 3}]        |
> |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, {4, 4}, {5, 5}]|
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> {noformat}
> h3. *Got:*
> {noformat}
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |A  |B  |C           |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)                 |
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]      |[2, 3, 4]   |[{4, 4}, {4, 4}, {4, 4}]        |
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, {3, 3}]        |
> |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, {5, 5}, {5, 5}]|
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> {noformat}
> h3. *Observation*
>  * Work correctly on Spark 3.0.2
>  * When UDF is registered as Java UDF, it works as supposed
>  * The UDF is called the appropriate number of times (regardless if UDF is marked as deterministic or non-deterministic).
>  * When debugged, the correct value is actually saved into the result array at first but every subsequent item processing overwrites the previous result values as well. Therefore the last item values filling the array is the final result.
>  * When the UDF returns NULL/None it does not "overwrite” the prior array values nor is “overwritten” by subsequent non-NULL values. See with following UDF impelementation:
> {code:scala}
> val udfString = udf((num: Int) => {
>   if (num == 3) {
>     None
>   } else {
>     Some((num + 1).toString)
>   }
> })
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org