You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2023/02/01 08:18:00 UTC
[jira] [Commented] (SPARK-42115) Push down limit through Python UDFs
[ https://issues.apache.org/jira/browse/SPARK-42115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682905#comment-17682905 ]
Apache Spark commented on SPARK-42115:
--------------------------------------
User 'kelvinjian-db' has created a pull request for this issue:
https://github.com/apache/spark/pull/39842
> Push down limit through Python UDFs
> -----------------------------------
>
> Key: SPARK-42115
> URL: https://issues.apache.org/jira/browse/SPARK-42115
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 3.4.0
> Reporter: Hyukjin Kwon
> Priority: Major
>
> {code}
> from pyspark.sql.functions import udf
> spark.range(10).write.mode("overwrite").parquet("/tmp/abc")
> @udf(returnType='string')
> def my_udf(arg):
> return arg
> df = spark.read.parquet("/tmp/abc")
> df.limit(10).withColumn("prediction", my_udf(df["id"])).explain()
> {code}
> As an example. since Python UDFs are executed asynchronously, so pushing limits benefit the performance.
> {code}
> == Physical Plan ==
> CollectLimit 10
> +- *(2) Project [id#3L, pythonUDF0#10 AS prediction#6]
> +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
> +- *(1) ColumnarToRow
> +- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {code}
> This is a regression from Spark 3.3.1:
> {code}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [id#3L, pythonUDF0#10 AS prediction#6]
> +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
> +- GlobalLimit 10
> +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
> +- LocalLimit 10
> +- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org