You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Timothy Dijamco (Jira)" <ji...@apache.org> on 2022/06/15 16:14:00 UTC
[jira] [Updated] (SPARK-39481) Pandas UDF executed twice if used in projection followed by filter
[ https://issues.apache.org/jira/browse/SPARK-39481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timothy Dijamco updated SPARK-39481:
------------------------------------
Description:
In this scenario, a Pandas UDF will be executed twice:
# Projection that applies a Pandas UDF
# Filter
In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master('local[1]').getOrCreate()
df = spark.createDataFrame(
[
[1, 'one'],
[2, 'two'],
[3, 'three'],
],
'int_col int, string_col string',
)
@F.pandas_udf('int')
def copy_int_col(s):
return s
df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)
df.explain(True)
{code}
{code:java}
== Parsed Logical Plan ==
'Filter ('int_col_copy >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
+- LogicalRDD [int_col#322, string_col#323], false
== Analyzed Logical Plan ==
int_col: int, string_col: string, int_col_copy: int
Filter (int_col_copy#327 >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
+- LogicalRDD [int_col#322, string_col#323], false
== Optimized Logical Plan ==
Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
+- Project [int_col#322, string_col#323]
+- Filter (pythonUDF0#331 >= 3)
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
+- LogicalRDD [int_col#322, string_col#323], false
== Physical Plan ==
*(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
+- *(2) Project [int_col#322, string_col#323]
+- *(2) Filter (pythonUDF0#331 >= 3)
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
+- *(1) Scan ExistingRDD[int_col#322,string_col#323]
{code}
If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}), then it is not executed twice.
was:
In this scenario, a Pandas UDF will be executed twice:
# Projection that applies a Pandas UDF
# Filter
In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master('local[1]').getOrCreate()
df = spark.createDataFrame(
[
[1, 'one'],
[2, 'two'],
[3, 'three'],
],
'int_col int, string_col string',
)
@F.pandas_udf('int')
def copy_int_col(s):
return s
df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)
df.explain(True)
{code}
{code:java}
== Parsed Logical Plan ==
'Filter ('int_col_copy >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
+- LogicalRDD [int_col#322, string_col#323], false
== Analyzed Logical Plan ==
int_col: int, string_col: string, int_col_copy: int
Filter (int_col_copy#327 >= 3)
+- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
+- LogicalRDD [int_col#322, string_col#323], false
== Optimized Logical Plan ==
Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
+- Project [int_col#322, string_col#323]
+- Filter (pythonUDF0#331 >= 3)
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
+- LogicalRDD [int_col#322, string_col#323], false
== Physical Plan ==
*(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
+- *(2) Project [int_col#322, string_col#323]
+- *(2) Filter (pythonUDF0#331 >= 3)
+- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
+- *(1) Scan ExistingRDD[int_col#322,string_col#323]
{code}
If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}, then it is not executed twice.
> Pandas UDF executed twice if used in projection followed by filter
> ------------------------------------------------------------------
>
> Key: SPARK-39481
> URL: https://issues.apache.org/jira/browse/SPARK-39481
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.2.1
> Reporter: Timothy Dijamco
> Priority: Minor
>
> In this scenario, a Pandas UDF will be executed twice:
> # Projection that applies a Pandas UDF
> # Filter
> In the {{explain}} output of the example below, the Optimized Logical Plan and Physical Plan contain {{ArrowEvalPython}} twice:
> {code:python}
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as F
> spark = SparkSession.builder.master('local[1]').getOrCreate()
> df = spark.createDataFrame(
> [
> [1, 'one'],
> [2, 'two'],
> [3, 'three'],
> ],
> 'int_col int, string_col string',
> )
> @F.pandas_udf('int')
> def copy_int_col(s):
> return s
> df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
> df = df.filter(F.col('int_col_copy') >= 3)
> df.explain(True)
> {code}
> {code:java}
> == Parsed Logical Plan ==
> 'Filter ('int_col_copy >= 3)
> +- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
> +- LogicalRDD [int_col#322, string_col#323], false
> == Analyzed Logical Plan ==
> int_col: int, string_col: string, int_col_copy: int
> Filter (int_col_copy#327 >= 3)
> +- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS int_col_copy#327]
> +- LogicalRDD [int_col#322, string_col#323], false
> == Optimized Logical Plan ==
> Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
> +- Project [int_col#322, string_col#323]
> +- Filter (pythonUDF0#331 >= 3)
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
> +- LogicalRDD [int_col#322, string_col#323], false
> == Physical Plan ==
> *(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
> +- *(2) Project [int_col#322, string_col#323]
> +- *(2) Filter (pythonUDF0#331 >= 3)
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
> +- *(1) Scan ExistingRDD[int_col#322,string_col#323]
> {code}
> If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = copy_int_col.asNondeterministic(){}}}), then it is not executed twice.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org