You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (Jira)" <ji...@apache.org> on 2022/06/07 01:34:00 UTC

[jira] [Commented] (SPARK-38485) Non-deterministic UDF executed multiple times when combined with withField

    [ https://issues.apache.org/jira/browse/SPARK-38485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550737#comment-17550737 ] 

Josh Rosen commented on SPARK-38485:
------------------------------------

It looks like the child expression of the `UpdateFields` plan node will be evaluated multiple times. If the child is non-deterministic then this can lead to the correctness issues seen here.

Other parts of Spark contain logic to avoid duplication of non-deterministic expressions. For example, Spark won't collapse adjacent projections if doing so would lead to the duplication of a non-deterministic expression. We can exploit that existing project collapsing rule to work around this bug: 
{code:java}
val df3 = spark
  .range(5)
  .select(nondeterministicUDF($"id").as("udfOutput"))
  .select($"udfOutput".withField("new", lit(7))) {code}
This generates the following plan:
{code:java}
== Parsed Logical Plan ==
'Project [unresolvedalias(update_fields('udfOutput, WithField(new, 7)), Some(org.apache.spark.sql.Column$$Lambda$9491/376740337@3cd91041))]
+- Project [UDF(cast(id#9499682L as int)) AS udfOutput#9499684]
   +- Range (0, 5, step=1, splits=Some(4))

== Analyzed Logical Plan ==
update_fields(udfOutput, WithField(7)): struct<a:int,b:int,new:int>
Project [update_fields(udfOutput#9499684, WithField(new, 7)) AS update_fields(udfOutput, WithField(7))#9499689]
+- Project [UDF(cast(id#9499682L as int)) AS udfOutput#9499684]
   +- Range (0, 5, step=1, splits=Some(4))

== Optimized Logical Plan ==
Project [if (isnull(udfOutput#9499684)) null else named_struct(a, udfOutput#9499684.a, b, udfOutput#9499684.b, new, 7) AS update_fields(udfOutput, WithField(7))#9499689]
+- Project [UDF(cast(id#9499682L as int)) AS udfOutput#9499684]
   +- Range (0, 5, step=1, splits=Some(4))

== Physical Plan ==
*(1) Project [if (isnull(udfOutput#9499684)) null else named_struct(a, udfOutput#9499684.a, b, udfOutput#9499684.b, new, 7) AS update_fields(udfOutput, WithField(7))#9499689]
+- *(1) Project [UDF(cast(id#9499682L as int)) AS udfOutput#9499684]
   +- *(1) ColumnarToRow
      +- PhotonResultStage
         +- PhotonRange Range (0, 5, step=1, splits=4) {code}
Here the UDF is evaluated once in the first project and the result of the evaluation is re-used in the withField.

Given this, one potential way to fix this problem could be to add an optimizer rule which performs what is effectively the opposite of project collapsing: if we see a non-deterministic child of UpdateFields then introduce a new projection to prevent duplication of the expression.

> Non-deterministic UDF executed multiple times when combined with withField
> --------------------------------------------------------------------------
>
>                 Key: SPARK-38485
>                 URL: https://issues.apache.org/jira/browse/SPARK-38485
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Tanel Kiis
>            Priority: Major
>              Labels: Correctness
>
> When adding fields to a result of a non-deterministic UDF, that returns a struct, then that UDF is executed multiple times (once per field) for each row.
> In this UT df1 passes, but df2 fails with something like:
> "279751724 did not equal -1023188908"
> {code}
>   test("SPARK-XXXXX: non-deterministic UDF should be called once when adding fields") {
>     val nondeterministicUDF = udf((s: Int) => {
>       val r = Random.nextInt()
>       // Both values should be the same
>       GroupByKey(r, r)
>     }).asNondeterministic()
>     val df1 = spark.range(5).select(nondeterministicUDF($"id"))
>     df1.collect().foreach {
>       row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
>     }
>     val df2 = spark.range(5).select(nondeterministicUDF($"id").withField("new", lit(7)))
>     df2.collect().foreach {
>       row => assert(row.getStruct(0).getInt(0) == row.getStruct(0).getInt(1))
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org