You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Santosh Pingale (Jira)" <ji...@apache.org> on 2023/02/05 18:34:00 UTC

[jira] [Commented] (SPARK-42349) Support pandas cogroup with multiple df

    [ https://issues.apache.org/jira/browse/SPARK-42349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17684337#comment-17684337 ] 

Santosh Pingale commented on SPARK-42349:
-----------------------------------------

From https://lists.apache.org/thread/jlpr3jm0slk33qhj0r9jvwmz8sq50vd8

RDD currently supports cogroup with upto 4 dataframes (ZippedPartitionsRDD4) where as cogroup with pandas can handle only 2 dataframes (with ZippedPartitionsRDD2). In our use case, we do not have much control over how many data frames we may need in the cogroup.applyInPandas function.

To achieve this, we can:
(a) Implement ZippedPartitionsRDD5, ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with respective iterators, serializers and so on. This ensures we keep type safety intact but a lot more boilerplate code has to be written to achieve this.
(b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and then getItem in a nested fashion. Then convert data to pandas df in the python function. This looks like a good workaround but mistakes are very easy to happen. We also don't look at typesafety here from user's point of view.
(c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type set to Seq[T] which allows for arbitrary number of children to be set. Here we have very little boilerplate but we sacrifice type safety.
(d) ... some new suggestions... ?


I have option (c) implementation ready. Right now there are two codepaths, one with previous implementation. One with new implementation. The previous implementation supported either 2 or 3 args for UDF. If 3 args were passed then it automatically assumed that first arg is key. This assumption limits the flexibility. So I propose that we remove previous signature in favour of new one and as it is marked experimental, we shouldn't have issues to make this change. In the new alternative, we explicitly pass additional arg in `applyInPandas` called `pass_key`. If it is set to true then we key pass as the first arg followed by dfs.

> Support pandas cogroup with multiple df
> ---------------------------------------
>
>                 Key: SPARK-42349
>                 URL: https://issues.apache.org/jira/browse/SPARK-42349
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 3.3.1
>            Reporter: Santosh Pingale
>            Priority: Trivial
>
> Currently pyspark support `cogroup.applyInPandas` with only 2 dataframes. The improvement request is to support multiple dataframes with variable arity. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org