You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2023/02/02 01:02:00 UTC

[jira] [Resolved] (SPARK-42115) Push down limit through Python UDFs

     [ https://issues.apache.org/jira/browse/SPARK-42115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-42115.
---------------------------------
    Fix Version/s: 3.4.0
       Resolution: Fixed

Issue resolved by pull request 39842
[https://github.com/apache/spark/pull/39842]

> Push down limit through Python UDFs
> -----------------------------------
>
>                 Key: SPARK-42115
>                 URL: https://issues.apache.org/jira/browse/SPARK-42115
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.4.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Major
>             Fix For: 3.4.0
>
>
> {code}
> from pyspark.sql.functions import udf
> spark.range(10).write.mode("overwrite").parquet("/tmp/abc")
> @udf(returnType='string')
> def my_udf(arg):
>     return arg
> df = spark.read.parquet("/tmp/abc")
> df.limit(10).withColumn("prediction", my_udf(df["id"])).explain()
> {code}
> As an example. since Python UDFs are executed asynchronously, so pushing limits benefit the performance.
> {code}
> == Physical Plan ==
> CollectLimit 10
> +- *(2) Project [id#3L, pythonUDF0#10 AS prediction#6]
>    +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
>       +- *(1) ColumnarToRow
>          +- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {code}
> This is a regression from Spark 3.3.1:
> {code}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [id#3L, pythonUDF0#10 AS prediction#6]
>    +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
>       +- GlobalLimit 10
>          +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
>             +- LocalLimit 10
>                +- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org