You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Nitin Siwach <ni...@gmail.com> on 2022/01/04 06:54:11 UTC

understanding iterator of series to iterator of series pandasUDF

I understand pandasUDF as follows:

1. There are multiple partitions per worker
2. Multiple arrow batches are converted per partition
3. Sent to python process
4. In the case of Series to Series the pandasUDF is applied to each arrow
batch one after the other? **(So, is it that (a) - The vectorisation is at
the arrow batch level but each batch, in turn, is processed sequentially by
the worker. Or, is it that (b) - The arrow batches are combined after all
have arrived and then the pandasUDF is applied to the whole?)** I think it
is (b). i.e. the arrow batches are combined. I have given my reasoning below

Given this understanding and blackbishop's answer I have the following
further questions:

*How exactly is Iterator versions of pandasUDFs working?*

1. If there is some expensive initialization then why can we not do that in
the case of series to series pandasUDF as well. In the case of iterator of
series to iterator of series the initialization is done and is shared
across all the workers and used for all the arrow batches. Why can not the
same process be followed for a series to series pandasUDF? initialize -->
Share to workers --> once all the arrow batches are combined on a worker,
Apply?
2. I can see that we might want to separate out the execution of i/o and
python code on arrow batches so as one batch is being read in the pandasUDF
is being run on the previous batch. (Why is this not done in the case of
series to series? **This is why I think all the arrow batches are combined
before running them through the pandasUDF. Because, otherwise the same i/o
parallelization benefits are available for series to series pandasUDF as
well**

One more question:

1. Since the output is an Iterator of Series, where is the vectorisation
then? Is it that the pandasUDF is run on an entire arrow batch and then the
result is emitted row by row? Or, is the pandasUDF processing the arrow
batches row by row and then emitting the result (This loses vectorisation
as I see it)

Re: understanding iterator of series to iterator of series pandasUDF

Posted by Sean Owen <sr...@gmail.com>.
That's about right, but the iterator UDF is executed per partition, not
worker.

Series to series is just simpler for cases where init does not matter.

On Tue, Jan 4, 2022, 12:25 PM Nitin Siwach <ni...@gmail.com> wrote:

> I think I have an understanding now.
> 1. In iterator to iterator the pandasUDF is called num_workers number of
> times. The entire iterator is passed once to the pandasUDF and then the
> iterator is consumed arrow_batch by arrow_batch
> 2. In series to series the pandasUDF is called num_rows/arrow_batch_size
> number of times. pandasUDF is called anew for each batch
>
> I have one final doubt though. *I now do not see the need for series to
> series pandasUDF to exist. IMO every use case can be solved more
> efficiently with iterator of series to iterator of series pandasUDF. Under
> what circumstances would a series to series pandasUDF be recommended over
> an iterator of series to iterator of series pandasUDF?*
>
> On Tue, Jan 4, 2022 at 7:40 PM Sean Owen <sr...@gmail.com> wrote:
>
>> Yes, the UDF gets an iterator of pandas DataFrames, so your UDF will
>> process them one at a time.
>> The idea is to perform any expensive init once per partition, once before
>> many DataFrames are processed, rather than before each DataFrame.
>> The Arrow conversion is the same in either case. The benefit comes from
>> processing batches as DataFrames in both cases.
>>
>> On Tue, Jan 4, 2022 at 8:05 AM Nitin Siwach <ni...@gmail.com>
>> wrote:
>>
>>> I understand pandasUDF as follows:
>>>
>>> 1. There are multiple partitions per worker
>>> 2. Multiple arrow batches are converted per partition
>>> 3. Sent to python process
>>> 4. In the case of Series to Series the pandasUDF is applied to each
>>> arrow batch one after the other? **(So, is it that (a) - The vectorisation
>>> is at the arrow batch level but each batch, in turn, is processed
>>> sequentially by the worker. Or, is it that (b) - The arrow batches are
>>> combined after all have arrived and then the pandasUDF is applied to the
>>> whole?)** I think it is (b). i.e. the arrow batches are combined. I have
>>> given my reasoning below
>>>
>>> Given this understanding and blackbishop's answer I have the following
>>> further questions:
>>>
>>> *How exactly is Iterator versions of pandasUDFs working?*
>>>
>>> 1. If there is some expensive initialization then why can we not do that
>>> in the case of series to series pandasUDF as well. In the case of iterator
>>> of series to iterator of series the initialization is done and is shared
>>> across all the workers and used for all the arrow batches. Why can not the
>>> same process be followed for a series to series pandasUDF? initialize -->
>>> Share to workers --> once all the arrow batches are combined on a worker,
>>> Apply?
>>> 2. I can see that we might want to separate out the execution of i/o and
>>> python code on arrow batches so as one batch is being read in the pandasUDF
>>> is being run on the previous batch. (Why is this not done in the case of
>>> series to series? **This is why I think all the arrow batches are combined
>>> before running them through the pandasUDF. Because, otherwise the same i/o
>>> parallelization benefits are available for series to series pandasUDF as
>>> well**
>>>
>>> One more question:
>>>
>>> 1. Since the output is an Iterator of Series, where is the vectorisation
>>> then? Is it that the pandasUDF is run on an entire arrow batch and then the
>>> result is emitted row by row? Or, is the pandasUDF processing the arrow
>>> batches row by row and then emitting the result (This loses vectorisation
>>> as I see it)
>>>
>>

Re: understanding iterator of series to iterator of series pandasUDF

Posted by Sean Owen <sr...@gmail.com>.
Yes, the UDF gets an iterator of pandas DataFrames, so your UDF will
process them one at a time.
The idea is to perform any expensive init once per partition, once before
many DataFrames are processed, rather than before each DataFrame.
The Arrow conversion is the same in either case. The benefit comes from
processing batches as DataFrames in both cases.

On Tue, Jan 4, 2022 at 8:05 AM Nitin Siwach <ni...@gmail.com> wrote:

> I understand pandasUDF as follows:
>
> 1. There are multiple partitions per worker
> 2. Multiple arrow batches are converted per partition
> 3. Sent to python process
> 4. In the case of Series to Series the pandasUDF is applied to each arrow
> batch one after the other? **(So, is it that (a) - The vectorisation is at
> the arrow batch level but each batch, in turn, is processed sequentially by
> the worker. Or, is it that (b) - The arrow batches are combined after all
> have arrived and then the pandasUDF is applied to the whole?)** I think it
> is (b). i.e. the arrow batches are combined. I have given my reasoning below
>
> Given this understanding and blackbishop's answer I have the following
> further questions:
>
> *How exactly is Iterator versions of pandasUDFs working?*
>
> 1. If there is some expensive initialization then why can we not do that
> in the case of series to series pandasUDF as well. In the case of iterator
> of series to iterator of series the initialization is done and is shared
> across all the workers and used for all the arrow batches. Why can not the
> same process be followed for a series to series pandasUDF? initialize -->
> Share to workers --> once all the arrow batches are combined on a worker,
> Apply?
> 2. I can see that we might want to separate out the execution of i/o and
> python code on arrow batches so as one batch is being read in the pandasUDF
> is being run on the previous batch. (Why is this not done in the case of
> series to series? **This is why I think all the arrow batches are combined
> before running them through the pandasUDF. Because, otherwise the same i/o
> parallelization benefits are available for series to series pandasUDF as
> well**
>
> One more question:
>
> 1. Since the output is an Iterator of Series, where is the vectorisation
> then? Is it that the pandasUDF is run on an entire arrow batch and then the
> result is emitted row by row? Or, is the pandasUDF processing the arrow
> batches row by row and then emitting the result (This loses vectorisation
> as I see it)
>