You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vladimir Matveev (JIRA)" <ji...@apache.org> on 2019/07/09 20:37:00 UTC
[jira] [Updated] (SPARK-28321) functions.udf(UDF0, DataType)
produces unexpected results
[ https://issues.apache.org/jira/browse/SPARK-28321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vladimir Matveev updated SPARK-28321:
-------------------------------------
Description:
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]):
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF0[Any]].call()
SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic()
(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()
// prints
+-----------+---------+
| scala| java|
+-----------+---------+
| 934190385|478543809|
|-1082102515|478543809|
| 774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
| 260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
| 481017016|478543809|
+-----------+---------{code}
Note that the version which relies on a different overload of the `functions.udf` method works correctly.
was:
It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]
{code:java}
def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
val func = f.asInstanceOf[UDF0[Any]].call()
SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
}
{code}
Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:
{code:java}
val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic()
(1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()
// prints
+-----------+---------+
| scala| java|
+-----------+---------+
| 934190385|478543809|
|-1082102515|478543809|
| 774466710|478543809|
| 1883582103|478543809|
|-1959743031|478543809|
| 1534685218|478543809|
| 1158899264|478543809|
|-1572590653|478543809|
| -309451364|478543809|
| -906574467|478543809|
| -436584308|478543809|
| 1598340674|478543809|
|-1331343156|478543809|
|-1804177830|478543809|
|-1682906106|478543809|
| -197444289|478543809|
| 260603049|478543809|
|-1993515667|478543809|
|-1304685845|478543809|
| 481017016|478543809|
+-----------+---------{code}
Note that the version which relies on a different overload of the `functions.udf` method works correctly.
> functions.udf(UDF0, DataType) produces unexpected results
> ---------------------------------------------------------
>
> Key: SPARK-28321
> URL: https://issues.apache.org/jira/browse/SPARK-28321
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.2, 2.4.3
> Reporter: Vladimir Matveev
> Priority: Major
>
> It looks like that the `f.udf(UDF0, DataType)` variant of the UDF Column-creating methods is wrong ([https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061|https://github.com/apache/spark/blob/c3e32bf06c35ba2580d46150923abfa795b4446a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L4061):]):
>
> {code:java}
> def udf(f: UDF0[_], returnType: DataType): UserDefinedFunction = {
> val func = f.asInstanceOf[UDF0[Any]].call()
> SparkUserDefinedFunction.create(() => func, returnType, inputSchemas = Seq.fill(0)(None))
> }
> {code}
> Here the UDF passed as the first argument will be called *right inside the `udf` method* on the driver, rather than at the dataframe computation time on executors. One of the major issues here is that non-deterministic UDFs (e.g. generating a random value) will produce unexpected results:
>
>
> {code:java}
> val scalaudf = f.udf { () => scala.util.Random.nextInt() }.asNondeterministic()
> val javaudf = f.udf(new UDF0[Int] { override def call(): Int = scala.util.Random.nextInt() }, IntegerType).asNondeterministic()
> (1 to 100).toDF().select(scalaudf().as("scala"), javaudf().as("java")).show()
> // prints
> +-----------+---------+
> | scala| java|
> +-----------+---------+
> | 934190385|478543809|
> |-1082102515|478543809|
> | 774466710|478543809|
> | 1883582103|478543809|
> |-1959743031|478543809|
> | 1534685218|478543809|
> | 1158899264|478543809|
> |-1572590653|478543809|
> | -309451364|478543809|
> | -906574467|478543809|
> | -436584308|478543809|
> | 1598340674|478543809|
> |-1331343156|478543809|
> |-1804177830|478543809|
> |-1682906106|478543809|
> | -197444289|478543809|
> | 260603049|478543809|
> |-1993515667|478543809|
> |-1304685845|478543809|
> | 481017016|478543809|
> +-----------+---------{code}
> Note that the version which relies on a different overload of the `functions.udf` method works correctly.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org