You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aoyuan Liao (Jira)" <ji...@apache.org> on 2020/10/23 00:29:00 UTC

[jira] [Comment Edited] (SPARK-33221) UDF in when operation applied in all the rows regardless the condition

    [ https://issues.apache.org/jira/browse/SPARK-33221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219390#comment-17219390 ] 

Aoyuan Liao edited comment on SPARK-33221 at 10/23/20, 12:28 AM:
-----------------------------------------------------------------

[~marcodena] It is not a bug but intended. In _pyspark.sql.functions.when(condition, udf) , udf will be evaluated for all rows no matter what the return value of condition is. The function are not applied on a {{Row}} basis, but using batch mode, which means it will execute udf for the whole column. Just like how pandas works_


was (Author: eveliao):
[~marcodena] It is not a bug but intended. In _pyspark.sql.functions.when(condition, udf) , udf will be evaluated for all rows no matter what the return value of condition is. The function are not applied on a {{Row}} basis, but using batch mode, which means it will execute udf for the whole column._

> UDF in when operation applied in all the rows regardless the condition
> ----------------------------------------------------------------------
>
>                 Key: SPARK-33221
>                 URL: https://issues.apache.org/jira/browse/SPARK-33221
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.5, 3.0.1
>            Reporter: Marco De Nadai
>            Priority: Major
>
> Hi all,
> I think there is a bug or, at least, an undocumented behaviour of pyspark UDFs. The code here is trying to apply the UDF just for a subset of rows (convenient for long dataframes).
>  
> {code:java}
> @udf(returnType=BooleanType())
> def test_udf(x):
>   if x is None:
>     raise Exception(x)
>   return True
>   
> data = [(1,11,1),(1,22,2),(1,33,3),(2,44,1),(3,55,1),(4,66,1)]
> dataColumns = ["uid","price","day"]
> test = spark.createDataFrame(data=data, schema = dataColumns)w = Window.partitionBy('uid').orderBy('uid','day')
> test = test.withColumn('lag_price', F.lead(F.col('price')).over(w))
> print(test.dtypes)
> test = test.withColumn('condition', F.col('lag_price').isNotNull())
> test.withColumn('appliedUDF', F.when(F.col('condition'), test_udf(F.col('lag_price'))).otherwise(False)).show()
> {code}
> It throws this error:
> {code:java}
> File "<command-3513778682084612>", line 4, in test_udf
> Exception: None
> {code}
> Is it normal? Am I missing something?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org