You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Santosh Pingale <sa...@adyen.com.INVALID> on 2023/01/24 14:04:54 UTC

Pandas UDF cogroup.applyInPandas with multiple dataframes

Hey all

I have an interesting problem in hand. We have cases where we want to pass multiple(20 to 30) data frames to cogroup.applyInPandas function.

RDD currently supports cogroup with upto 4 dataframes (ZippedPartitionsRDD4)  where as cogroup with pandas can handle only 2 dataframes (with ZippedPartitionsRDD2). In our use case, we do not have much control over how many data frames we may need in the cogroup.applyInPandas function.

To achieve this, we can:
(a) Implement ZippedPartitionsRDD5, ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with respective iterators, serializers and so on. This ensures we keep type safety intact but a lot more boilerplate code has to be written to achieve this.
(b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and then getItem in a nested fashion. Then convert data to pandas df in the python function. This looks like a good workaround but mistakes are very easy to happen. We also don't look at typesafety here from user's point of view.
(c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type set to Seq[T] which allows for arbitrary number of children to be set. Here we have very little boilerplate but we sacrifice type safety.
(d) ... some new suggestions... ?

I have done preliminary work on option (c). It works like a charm but before I proceed, is my concern about sacrificed type safety overblown, and do we have an approach (d)?
(a) is something that is too much of an investment for it to be useful. (b) is okay enough workaround, but it is not very efficient.


Re: Pandas UDF cogroup.applyInPandas with multiple dataframes

Posted by Santosh Pingale <sa...@adyen.com.INVALID>.
I have opened two PRs:
One that tries to maintain backwards compatibility: https://github.com/apache/spark/pull/39902 <https://github.com/apache/spark/pull/39902>
One that breaks the API to make it cleaner: https://github.com/apache/spark/pull/40122 <https://github.com/apache/spark/pull/40122>

Note this API has been marked experimental so imagining breaking changes is a possibility at the moment, whether we do it or not in practice is something we need to decide.

> On 7 Feb 2023, at 22:52, Li Jin <ic...@gmail.com> wrote:
> 
> I am not a Spark committer and haven't been working on Spark for a while. However, I was heavily involved in the original cogroup work and we are using cogroup functionality pretty heavily and I want to give my two cents here.
> 
> I think this is a nice improvement and I hope someone from the PySpark side can take a look at this.
> 
> On Mon, Feb 6, 2023 at 5:29 AM Santosh Pingale <sa...@adyen.com.invalid> wrote:
> Created  a PR: https://github.com/apache/spark/pull/39902 <https://github.com/apache/spark/pull/39902>
> 
> 
>> On 24 Jan 2023, at 15:04, Santosh Pingale <santosh.pingale@adyen.com <ma...@adyen.com>> wrote:
>> 
>> Hey all
>> 
>> I have an interesting problem in hand. We have cases where we want to pass multiple(20 to 30) data frames to cogroup.applyInPandas function.
>> 
>> RDD currently supports cogroup with upto 4 dataframes (ZippedPartitionsRDD4)  where as cogroup with pandas can handle only 2 dataframes (with ZippedPartitionsRDD2). In our use case, we do not have much control over how many data frames we may need in the cogroup.applyInPandas function.
>> 
>> To achieve this, we can:
>> (a) Implement ZippedPartitionsRDD5, ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with respective iterators, serializers and so on. This ensures we keep type safety intact but a lot more boilerplate code has to be written to achieve this.
>> (b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and then getItem in a nested fashion. Then convert data to pandas df in the python function. This looks like a good workaround but mistakes are very easy to happen. We also don't look at typesafety here from user's point of view.
>> (c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type set to Seq[T] which allows for arbitrary number of children to be set. Here we have very little boilerplate but we sacrifice type safety.
>> (d) ... some new suggestions... ?
>> 
>> I have done preliminary work on option (c). It works like a charm but before I proceed, is my concern about sacrificed type safety overblown, and do we have an approach (d)?
>> (a) is something that is too much of an investment for it to be useful. (b) is okay enough workaround, but it is not very efficient.
>> 
> 


Re: Pandas UDF cogroup.applyInPandas with multiple dataframes

Posted by Li Jin <ic...@gmail.com>.
I am not a Spark committer and haven't been working on Spark for a while.
However, I was heavily involved in the original cogroup work and we are
using cogroup functionality pretty heavily and I want to give my two cents
here.

I think this is a nice improvement and I hope someone from the PySpark side
can take a look at this.

On Mon, Feb 6, 2023 at 5:29 AM Santosh Pingale
<sa...@adyen.com.invalid> wrote:

> Created  a PR: https://github.com/apache/spark/pull/39902
>
>
> On 24 Jan 2023, at 15:04, Santosh Pingale <sa...@adyen.com>
> wrote:
>
> Hey all
>
> I have an interesting problem in hand. We have cases where we want to pass
> multiple(20 to 30) data frames to cogroup.applyInPandas function.
>
> RDD currently supports cogroup with upto 4 dataframes
> (ZippedPartitionsRDD4)  where as cogroup with pandas can handle only 2
> dataframes (with ZippedPartitionsRDD2). In our use case, we do not have
> much control over how many data frames we may need in the
> cogroup.applyInPandas function.
>
> To achieve this, we can:
> (a) Implement ZippedPartitionsRDD5,
> ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with
> respective iterators, serializers and so on. This ensures we keep type
> safety intact but a lot more boilerplate code has to be written to achieve
> this.
> (b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and
> then getItem in a nested fashion. Then convert data to pandas df in the
> python function. This looks like a good workaround but mistakes are very
> easy to happen. We also don't look at typesafety here from user's point of
> view.
> (c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type
> set to Seq[T] which allows for arbitrary number of children to be set. Here
> we have very little boilerplate but we sacrifice type safety.
> (d) ... some new suggestions... ?
>
> I have done preliminary work on option (c). It works like a charm but
> before I proceed, is my concern about sacrificed type safety overblown, and
> do we have an approach (d)?
> (a) is something that is too much of an investment for it to be useful.
> (b) is okay enough workaround, but it is not very efficient.
>
>
>

Re: Pandas UDF cogroup.applyInPandas with multiple dataframes

Posted by Santosh Pingale <sa...@adyen.com.INVALID>.
Created  a PR: https://github.com/apache/spark/pull/39902 <https://github.com/apache/spark/pull/39902>


> On 24 Jan 2023, at 15:04, Santosh Pingale <sa...@adyen.com> wrote:
> 
> Hey all
> 
> I have an interesting problem in hand. We have cases where we want to pass multiple(20 to 30) data frames to cogroup.applyInPandas function.
> 
> RDD currently supports cogroup with upto 4 dataframes (ZippedPartitionsRDD4)  where as cogroup with pandas can handle only 2 dataframes (with ZippedPartitionsRDD2). In our use case, we do not have much control over how many data frames we may need in the cogroup.applyInPandas function.
> 
> To achieve this, we can:
> (a) Implement ZippedPartitionsRDD5, ZippedPartitionsRDD..ZippedPartitionsRDD30..ZippedPartitionsRDD50 with respective iterators, serializers and so on. This ensures we keep type safety intact but a lot more boilerplate code has to be written to achieve this.
> (b) Do not use cogroup.applyInPandas, rather use RDD.keyBy.cogroup and then getItem in a nested fashion. Then convert data to pandas df in the python function. This looks like a good workaround but mistakes are very easy to happen. We also don't look at typesafety here from user's point of view.
> (c) Implement ZippedPartitionsRDDN and NaryLike with childrenNodes type set to Seq[T] which allows for arbitrary number of children to be set. Here we have very little boilerplate but we sacrifice type safety.
> (d) ... some new suggestions... ?
> 
> I have done preliminary work on option (c). It works like a charm but before I proceed, is my concern about sacrificed type safety overblown, and do we have an approach (d)?
> (a) is something that is too much of an investment for it to be useful. (b) is okay enough workaround, but it is not very efficient.
>