You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Timothy Dijamco (Jira)" <ji...@apache.org> on 2022/06/15 16:13:00 UTC

[jira] [Created] (SPARK-39481) Pandas UDF executed twice if used in projection followed by filter

Timothy Dijamco created SPARK-39481:
---------------------------------------

             Summary: Pandas UDF executed twice if used in projection followed by filter
                 Key: SPARK-39481
                 URL: https://issues.apache.org/jira/browse/SPARK-39481
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.2.1
            Reporter: Timothy Dijamco


In this scenario, a Pandas UDF will be executed twice:
 # Projection that applies a Pandas UDF
 # Filter

In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[1]').getOrCreate()

df = spark.createDataFrame(
    [
        [1, 'one'],
        [2, 'two'],
        [3, 'three'],
    ],
    'int_col int, string_col string',
)

@F.pandas_udf('int')
def copy_int_col(s):
    return s

df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)

df.explain(True)
{code}
{code:java}
== Parsed Logical Plan ==
'Filter ('int_col_copy >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
   +- LogicalRDD [int_col#322, string_col#323], false

== Analyzed Logical Plan ==
int_col: int, string_col: string, int_col_copy: int
Filter (int_col_copy#327 >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
   +- LogicalRDD [int_col#322, string_col#323], false

== Optimized Logical Plan ==
Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
   +- Project [int_col#322, string_col#323]
      +- Filter (pythonUDF0#331 >= 3)
         +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
            +- LogicalRDD [int_col#322, string_col#323], false

== Physical Plan ==
*(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
   +- *(2) Project [int_col#322, string_col#323]
      +- *(2) Filter (pythonUDF0#331 >= 3)
         +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
            +- *(1) Scan ExistingRDD[int_col#322,string_col#323]
{code}
If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}, then it is not executed twice.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org