You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Timothy Dijamco (Jira)" <ji...@apache.org> on 2022/06/15 16:13:00 UTC
[jira] [Created] (SPARK-39481) Pandas UDF executed twice if used in projection followed by filter
Timothy Dijamco created SPARK-39481:
---------------------------------------
Summary: Pandas UDF executed twice if used in projection followed by filter
Key: SPARK-39481
URL: https://issues.apache.org/jira/browse/SPARK-39481
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 3.2.1
Reporter: Timothy Dijamco
In this scenario, a Pandas UDF will be executed twice:
# Projection that applies a Pandas UDF
# Filter
In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master('local[1]').getOrCreate()
df = spark.createDataFrame(
[
[1, 'one'],
[2, 'two'],
[3, 'three'],
],
'int_col int, string_col string',
)
@F.pandas_udf('int')
def copy_int_col(s):
return s
df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)
df.explain(True)
{code}
{code:java}
== Parsed Logical Plan ==
'Filter ('int_col_copy >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
+- LogicalRDD [int_col#322, string_col#323], false
== Analyzed Logical Plan ==
int_col: int, string_col: string, int_col_copy: int
Filter (int_col_copy#327 >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
+- LogicalRDD [int_col#322, string_col#323], false
== Optimized Logical Plan ==
Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
+- Project [int_col#322, string_col#323]
+- Filter (pythonUDF0#331 >= 3)
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
+- LogicalRDD [int_col#322, string_col#323], false
== Physical Plan ==
*(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
+- *(2) Project [int_col#322, string_col#323]
+- *(2) Filter (pythonUDF0#331 >= 3)
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
+- *(1) Scan ExistingRDD[int_col#322,string_col#323]
{code}
If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}, then it is not executed twice.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org