You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Enrico Minack (Jira)" <ji...@apache.org> on 2022/08/01 09:30:00 UTC

[jira] [Updated] (SPARK-39931) Improve performance of applyInPandas for very small groups

     [ https://issues.apache.org/jira/browse/SPARK-39931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Enrico Minack updated SPARK-39931:
----------------------------------
    Description: 
Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in PySpark is very slow. The reason is that for each group, PySpark creates a Pandas DataFrame and calls into the Python code. For very small groups, the overhead is huge, for large groups, it is smaller.

Here is a benchmarks (seconds to groupBy(...).applyInPandas(...) 10m rows):
||groupSize||Scala||pyspark.sql||pyspark.pandas||
|1024|8.9|20.9|7.8|
|512|9.4|31.8|9.8|
|256|9.3|47.0|20.2|
|128|9.5|83.3|48.8|
|64|9.5|137.8|91.9|
|32|9.6|263.6|207.3|
|16|9.6|525.9|261.5|
|8|9.5|1,043|663.0|
|4|9.8|2,073|1,168|
|2|10.4|4,132|2,456|
|1|11.3|8,162|4,642|

*Idea to overcome this* is to call into Python side with a Pandas DataFrame that contains potentially multiple groups, then perform a Pandas DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame has all rows of single group, with small groups it contains many groups. This should improve efficiency.

I have prepared a PoC to benchmark that idea but am struggling to massage the internal Rows before sending them to Python. The idea is to turn the {{Dataset[T]}} into {{{}Dataset[(K, T)]{}}}, create a bespoke {{GroupedIterator}} that splits the Dataset into chunks that contain complete groups, sends them to Python, which the groups by key and apply the udf on the values. The first bit seems to be the hard bit:

{{{}sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala{}}}:
{code:scala}
    val inputRDD = ProjectExec(Seq(
        Alias(CreateNamedStruct(groupingAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "key")(),
        Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "val")()
      ), child).execute()
{code}

  was:
Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in PySpark is very slow. The reason is that for each group, PySpark creates a Pandas DataFrame and calls into the Python code. For very small groups, the overhead is huge, for large groups, it is smaller.

Here is a benchmarks (sendonds to groupBy(...).applyInPandas(...) 10m rows):
||groupSize||Scala||pyspark.sql||pyspark.pandas||
|1024|8.9|20.9|7.8|
|512|9.4|31.8|9.8|
|256|9.3|47.0|20.2|
|128|9.5|83.3|48.8|
|64|9.5|137.8|91.9|
|32|9.6|263.6|207.3|
|16|9.6|525.9|261.5|
|8|9.5|1,043|663.0|
|4|9.8|2,073|1,168|
|2|10.4|4,132|2,456|
|1|11.3|8,162|4,642|

*Idea to overcome this* is to call into Python side with a Pandas DataFrame that contains potentially multiple groups, then perform a Pandas DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame has all rows of single group, with small groups it contains many groups. This should improve efficiency.

I have prepared a PoC to benchmark that idea but am struggling to massage the internal Rows before sending them to Python. The idea is to turn the {{Dataset[T]}} into {{Dataset[(K, T)]}}, create a bespoke {{GroupedIterator}} that splits the Dataset into chunks that contain complete groups, sends them to Python, which the groups by key and apply the udf on the values. The first bit seems to be the hard bit:

{{sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala}}:
{code:scala}
    val inputRDD = ProjectExec(Seq(
        Alias(CreateNamedStruct(groupingAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "key")(),
        Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "val")()
      ), child).execute()
{code}


> Improve performance of applyInPandas for very small groups
> ----------------------------------------------------------
>
>                 Key: SPARK-39931
>                 URL: https://issues.apache.org/jira/browse/SPARK-39931
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark
>    Affects Versions: 3.4.0
>            Reporter: Enrico Minack
>            Priority: Major
>
> Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in PySpark is very slow. The reason is that for each group, PySpark creates a Pandas DataFrame and calls into the Python code. For very small groups, the overhead is huge, for large groups, it is smaller.
> Here is a benchmarks (seconds to groupBy(...).applyInPandas(...) 10m rows):
> ||groupSize||Scala||pyspark.sql||pyspark.pandas||
> |1024|8.9|20.9|7.8|
> |512|9.4|31.8|9.8|
> |256|9.3|47.0|20.2|
> |128|9.5|83.3|48.8|
> |64|9.5|137.8|91.9|
> |32|9.6|263.6|207.3|
> |16|9.6|525.9|261.5|
> |8|9.5|1,043|663.0|
> |4|9.8|2,073|1,168|
> |2|10.4|4,132|2,456|
> |1|11.3|8,162|4,642|
> *Idea to overcome this* is to call into Python side with a Pandas DataFrame that contains potentially multiple groups, then perform a Pandas DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame has all rows of single group, with small groups it contains many groups. This should improve efficiency.
> I have prepared a PoC to benchmark that idea but am struggling to massage the internal Rows before sending them to Python. The idea is to turn the {{Dataset[T]}} into {{{}Dataset[(K, T)]{}}}, create a bespoke {{GroupedIterator}} that splits the Dataset into chunks that contain complete groups, sends them to Python, which the groups by key and apply the udf on the values. The first bit seems to be the hard bit:
> {{{}sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala{}}}:
> {code:scala}
>     val inputRDD = ProjectExec(Seq(
>         Alias(CreateNamedStruct(groupingAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "key")(),
>         Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "val")()
>       ), child).execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org