You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Enrico Minack (Jira)" <ji...@apache.org> on 2022/08/01 09:25:00 UTC

[jira] [Created] (SPARK-39931) Improve performance of applyInPandas for very small groups

Enrico Minack created SPARK-39931:
-------------------------------------

             Summary: Improve performance of applyInPandas for very small groups
                 Key: SPARK-39931
                 URL: https://issues.apache.org/jira/browse/SPARK-39931
             Project: Spark
          Issue Type: New Feature
          Components: PySpark
    Affects Versions: 3.4.0
            Reporter: Enrico Minack


Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in PySpark is very slow. The reason is that for each group, PySpark creates a Pandas DataFrame and calls into the Python code. For very small groups, the overhead is huge, for large groups, it is smaller.

Here is a benchmarks (sendonds to groupBy(...).applyInPandas(...) 10m rows):
||groupSize||Scala||pyspark.sql||pyspark.pandas||
|1024|8.9|20.9|7.8|
|512|9.4|31.8|9.8|
|256|9.3|47.0|20.2|
|128|9.5|83.3|48.8|
|64|9.5|137.8|91.9|
|32|9.6|263.6|207.3|
|16|9.6|525.9|261.5|
|8|9.5|1,043|663.0|
|4|9.8|2,073|1,168|
|2|10.4|4,132|2,456|
|1|11.3|8,162|4,642|

*Idea to overcome this* is to call into Python side with a Pandas DataFrame that contains potentially multiple groups, then perform a Pandas DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame has all rows of single group, with small groups it contains many groups. This should improve efficiency.

I have prepared a PoC to benchmark that idea but am struggling to massage the internal Rows before sending them to Python. The idea is to turn the {{Dataset[T]}} into {{Dataset[(K, T)]}}, create a bespoke {{GroupedIterator}} that splits the Dataset into chunks that contain complete groups, sends them to Python, which the groups by key and apply the udf on the values. The first bit seems to be the hard bit:

{code:scala}
    val inputRDD = ProjectExec(Seq(
        Alias(CreateNamedStruct(groupingAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "key")(),
        Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) :: a :: Nil)), "val")()
      ), child).execute()
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org