You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vladimir Matveev (JIRA)" <ji...@apache.org> on 2019/07/09 20:37:00 UTC

[jira] [Updated] (SPARK-28321) functions.udf(UDF0, DataType) produces unexpected results

     [ https://issues.apache.org/jira/browse/SPARK-28321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vladimir Matveev updated SPARK-28321:
-------------------------------------
    Description: 
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]):

 
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
  val func = f.asInstanceOf[UDF0[Any]].call()
  SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:

 

 
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic()

(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()

// prints

+-----------+---------+
|      scala|     java|
+-----------+---------+
|  934190385|478543809|
|-1082102515|478543809|
|  774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
|  260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
|  481017016|478543809|
+-----------+---------{code}
Note that the version which relies on a different overload of the `functions.udf` method works correctly.

 

  was:
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]

 
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
  val func = f.asInstanceOf[UDF0[Any]].call()
  SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:

 

 
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic()

(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()

// prints

+-----------+---------+
|      scala|     java|
+-----------+---------+
|  934190385|478543809|
|-1082102515|478543809|
|  774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
|  260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
|  481017016|478543809|
+-----------+---------{code}
Note that the version which relies on a different overload of the `functions.udf` method works correctly.

 


> functions.udf(UDF0, DataType) produces unexpected results
> ---------------------------------------------------------
>
>                 Key: SPARK-28321
>                 URL: https://issues.apache.org/jira/browse/SPARK-28321
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2, 2.4.3
>            Reporter: Vladimir Matveev
>            Priority: Major
>
> It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]):
>  
> {code:java}
> def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
>   val func = f.asInstanceOf[UDF0[Any]].call()
>   SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
> }
> {code}
> Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:
>  
>  
> {code:java}
> val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
> val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic()
> (1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()
> // prints
> +-----------+---------+
> |      scala|     java|
> +-----------+---------+
> |  934190385|478543809|
> |-1082102515|478543809|
> |  774466710|478543809|
> | 1883582103|478543809|
> |-1959743031|478543809|
> | 1534685218|478543809|
> | 1158899264|478543809|
> |-1572590653|478543809|
> | -309451364|478543809|
> | -906574467|478543809|
> | -436584308|478543809|
> | 1598340674|478543809|
> |-1331343156|478543809|
> |-1804177830|478543809|
> |-1682906106|478543809|
> | -197444289|478543809|
> |  260603049|478543809|
> |-1993515667|478543809|
> |-1304685845|478543809|
> |  481017016|478543809|
> +-----------+---------{code}
> Note that the version which relies on a different overload of the `functions.udf` method works correctly.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org