You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Timothy Dijamco (Jira)" <ji...@apache.org> on 2022/06/15 16:14:00 UTC

[jira] [Updated] (SPARK-39481) Pandas UDF executed twice if used in projection followed by filter

     [ https://issues.apache.org/jira/browse/SPARK-39481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Dijamco updated SPARK-39481:
------------------------------------
    Description: 
In this scenario, a Pandas UDF will be executed twice:
 # Projection that applies a Pandas UDF
 # Filter

In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[1]').getOrCreate()

df = spark.createDataFrame(
    [
        [1, 'one'],
        [2, 'two'],
        [3, 'three'],
    ],
    'int_col int, string_col string',
)

@F.pandas_udf('int')
def copy_int_col(s):
    return s

df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)

df.explain(True)
{code}
{code:java}
== Parsed Logical Plan ==
'Filter ('int_col_copy >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
   +- LogicalRDD [int_col#322, string_col#323], false

== Analyzed Logical Plan ==
int_col: int, string_col: string, int_col_copy: int
Filter (int_col_copy#327 >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
   +- LogicalRDD [int_col#322, string_col#323], false

== Optimized Logical Plan ==
Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
   +- Project [int_col#322, string_col#323]
      +- Filter (pythonUDF0#331 >= 3)
         +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
            +- LogicalRDD [int_col#322, string_col#323], false

== Physical Plan ==
*(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
   +- *(2) Project [int_col#322, string_col#323]
      +- *(2) Filter (pythonUDF0#331 >= 3)
         +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
            +- *(1) Scan ExistingRDD[int_col#322,string_col#323]
{code}
If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}), then it is not executed twice.

  was:
In this scenario, a Pandas UDF will be executed twice:
 # Projection that applies a Pandas UDF
 # Filter

In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master('local[1]').getOrCreate()

df = spark.createDataFrame(
    [
        [1, 'one'],
        [2, 'two'],
        [3, 'three'],
    ],
    'int_col int, string_col string',
)

@F.pandas_udf('int')
def copy_int_col(s):
    return s

df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)

df.explain(True)
{code}
{code:java}
== Parsed Logical Plan ==
'Filter ('int_col_copy >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
   +- LogicalRDD [int_col#322, string_col#323], false

== Analyzed Logical Plan ==
int_col: int, string_col: string, int_col_copy: int
Filter (int_col_copy#327 >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
   +- LogicalRDD [int_col#322, string_col#323], false

== Optimized Logical Plan ==
Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
   +- Project [int_col#322, string_col#323]
      +- Filter (pythonUDF0#331 >= 3)
         +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
            +- LogicalRDD [int_col#322, string_col#323], false

== Physical Plan ==
*(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
   +- *(2) Project [int_col#322, string_col#323]
      +- *(2) Filter (pythonUDF0#331 >= 3)
         +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
            +- *(1) Scan ExistingRDD[int_col#322,string_col#323]
{code}
If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}, then it is not executed twice.


> Pandas UDF executed twice if used in projection followed by filter
> ------------------------------------------------------------------
>
>                 Key: SPARK-39481
>                 URL: https://issues.apache.org/jira/browse/SPARK-39481
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.2.1
>            Reporter: Timothy Dijamco
>            Priority: Minor
>
> In this scenario, a Pandas UDF will be executed twice:
>  # Projection that applies a Pandas UDF
>  # Filter
> In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
> {code:python}
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as F
> spark = SparkSession.builder.master('local[1]').getOrCreate()
> df = spark.createDataFrame(
>     [
>         [1, 'one'],
>         [2, 'two'],
>         [3, 'three'],
>     ],
>     'int_col int, string_col string',
> )
> @F.pandas_udf('int')
> def copy_int_col(s):
>     return s
> df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
> df = df.filter(F.col('int_col_copy') >= 3)
> df.explain(True)
> {code}
> {code:java}
> == Parsed Logical Plan ==
> 'Filter ('int_col_copy >= 3)
> +- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
>    +- LogicalRDD [int_col#322, string_col#323], false
> == Analyzed Logical Plan ==
> int_col: int, string_col: string, int_col_copy: int
> Filter (int_col_copy#327 >= 3)
> +- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
>    +- LogicalRDD [int_col#322, string_col#323], false
> == Optimized Logical Plan ==
> Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
>    +- Project [int_col#322, string_col#323]
>       +- Filter (pythonUDF0#331 >= 3)
>          +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
>             +- LogicalRDD [int_col#322, string_col#323], false
> == Physical Plan ==
> *(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
>    +- *(2) Project [int_col#322, string_col#323]
>       +- *(2) Filter (pythonUDF0#331 >= 3)
>          +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
>             +- *(1) Scan ExistingRDD[int_col#322,string_col#323]
> {code}
> If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}), then it is not executed twice.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org