You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by rahul patwari <ra...@gmail.com> on 2019/07/25 16:05:48 UTC

Stateful ParDo on Non-Keyed PCollection

Hi,

https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an
example of assigning an arbitrary-but-consistent index to each element on a
per key-and-window basis.

If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
PCollection<Row> with Fixed Windows, the state is maintained per window and
every element in the window will be assigned a consistent index?
Does this mean every element belonging to the window will be processed in a
single DoFn Instance, which otherwise could have been done in multiple
parallel instances, limiting performance?
Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?

Thanks,
Rahul

Re: Stateful ParDo on Non-Keyed PCollection

Posted by rahul patwari <ra...@gmail.com>.
Yes. But, GroupIntoBatches works on KV<K,V>. We are working on
PCollection<Row> throughout our pipeline.
We can convert Row to KV. But, we only have a few keys and a Bounded
PCollection. As we have Global windows and a few keys, the opportunity for
parallelism is limited to [No. of keys] with Stateful ParDo [per Key, Per
Window] Processing.

On Thu, Jul 25, 2019 at 10:08 PM Reuven Lax <re...@google.com> wrote:

> Have you looked at the GroupIntoBatches transform?
>
> On Thu, Jul 25, 2019 at 9:34 AM rahul patwari <ra...@gmail.com>
> wrote:
>
>> So, If an RPC call has to be performed for a batch of
>> Rows(PCollection<Row>), instead of each Row, the recommended way is to
>> batch the Rows in startBundle() of DoFn(
>> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
>> I thought Stateful and Timely Processing could be helpful here.
>>
>> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Though it's not obvious in the name, Stateful ParDos can only be
>>> applied to keyed PCollections, similar to GroupByKey. (You could,
>>> however, assign every element to the same key and then apply a
>>> Stateful DoFn, though in that case all elements would get processed on
>>> the same worker.)
>>>
>>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>> <ra...@gmail.com> wrote:
>>> >
>>> > Hi,
>>> >
>>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> gives an example of assigning an arbitrary-but-consistent index to each
>>> element on a per key-and-window basis.
>>> >
>>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>>> PCollection<Row> with Fixed Windows, the state is maintained per window and
>>> every element in the window will be assigned a consistent index?
>>> > Does this mean every element belonging to the window will be processed
>>> in a single DoFn Instance, which otherwise could have been done in multiple
>>> parallel instances, limiting performance?
>>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>>> PCollection?
>>> >
>>> > Thanks,
>>> > Rahul
>>>
>>

Re: Stateful ParDo on Non-Keyed PCollection

Posted by rahul patwari <ra...@gmail.com>.
Yes. But, GroupIntoBatches works on KV<K,V>. We are working on
PCollection<Row> throughout our pipeline.
We can convert Row to KV. But, we only have a few keys and a Bounded
PCollection. As we have Global windows and a few keys, the opportunity for
parallelism is limited to [No. of keys] with Stateful ParDo [per Key, Per
Window] Processing.

On Thu, Jul 25, 2019 at 10:08 PM Reuven Lax <re...@google.com> wrote:

> Have you looked at the GroupIntoBatches transform?
>
> On Thu, Jul 25, 2019 at 9:34 AM rahul patwari <ra...@gmail.com>
> wrote:
>
>> So, If an RPC call has to be performed for a batch of
>> Rows(PCollection<Row>), instead of each Row, the recommended way is to
>> batch the Rows in startBundle() of DoFn(
>> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
>> I thought Stateful and Timely Processing could be helpful here.
>>
>> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Though it's not obvious in the name, Stateful ParDos can only be
>>> applied to keyed PCollections, similar to GroupByKey. (You could,
>>> however, assign every element to the same key and then apply a
>>> Stateful DoFn, though in that case all elements would get processed on
>>> the same worker.)
>>>
>>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>>> <ra...@gmail.com> wrote:
>>> >
>>> > Hi,
>>> >
>>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> gives an example of assigning an arbitrary-but-consistent index to each
>>> element on a per key-and-window basis.
>>> >
>>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>>> PCollection<Row> with Fixed Windows, the state is maintained per window and
>>> every element in the window will be assigned a consistent index?
>>> > Does this mean every element belonging to the window will be processed
>>> in a single DoFn Instance, which otherwise could have been done in multiple
>>> parallel instances, limiting performance?
>>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>>> PCollection?
>>> >
>>> > Thanks,
>>> > Rahul
>>>
>>

Re: Stateful ParDo on Non-Keyed PCollection

Posted by Reuven Lax <re...@google.com>.
Have you looked at the GroupIntoBatches transform?

On Thu, Jul 25, 2019 at 9:34 AM rahul patwari <ra...@gmail.com>
wrote:

> So, If an RPC call has to be performed for a batch of
> Rows(PCollection<Row>), instead of each Row, the recommended way is to
> batch the Rows in startBundle() of DoFn(
> https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
> I thought Stateful and Timely Processing could be helpful here.
>
> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>> <ra...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>> gives an example of assigning an arbitrary-but-consistent index to each
>> element on a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
>> PCollection<Row> with Fixed Windows, the state is maintained per window and
>> every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed
>> in a single DoFn Instance, which otherwise could have been done in multiple
>> parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
>> PCollection?
>> >
>> > Thanks,
>> > Rahul
>>
>

Re: Stateful ParDo on Non-Keyed PCollection

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari
<ra...@gmail.com> wrote:
>
> So, If an RPC call has to be performed for a batch of Rows(PCollection<Row>), instead of each Row, the recommended way is to batch the Rows in startBundle() of DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?

Yes.

> I thought Stateful and Timely Processing could be helpful here.

The upside is that you can persist state across bundles (which is
especially helpful when bundles are small, e.g. for streaming
pipelines). The downside is that you can't persist state across keys
(and it also enforces a shuffle to colocate the data by key).

If you get to choose your keys, you would want to have about as many
keys as you have concurrent bundles (or some small multiple, to ensure
they're not lumpily distributed). Keying by something like
System.identityHashCode(this) in the body of a DoFn might be
sufficient.

> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>> <ra...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an example of assigning an arbitrary-but-consistent index to each element on a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, PCollection<Row> with Fixed Windows, the state is maintained per window and every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed in a single DoFn Instance, which otherwise could have been done in multiple parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>> >
>> > Thanks,
>> > Rahul

Re: Stateful ParDo on Non-Keyed PCollection

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Jul 25, 2019 at 6:34 PM rahul patwari
<ra...@gmail.com> wrote:
>
> So, If an RPC call has to be performed for a batch of Rows(PCollection<Row>), instead of each Row, the recommended way is to batch the Rows in startBundle() of DoFn(https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?

Yes.

> I thought Stateful and Timely Processing could be helpful here.

The upside is that you can persist state across bundles (which is
especially helpful when bundles are small, e.g. for streaming
pipelines). The downside is that you can't persist state across keys
(and it also enforces a shuffle to colocate the data by key).

If you get to choose your keys, you would want to have about as many
keys as you have concurrent bundles (or some small multiple, to ensure
they're not lumpily distributed). Keying by something like
System.identityHashCode(this) in the body of a DoFn might be
sufficient.

> On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Though it's not obvious in the name, Stateful ParDos can only be
>> applied to keyed PCollections, similar to GroupByKey. (You could,
>> however, assign every element to the same key and then apply a
>> Stateful DoFn, though in that case all elements would get processed on
>> the same worker.)
>>
>> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
>> <ra...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an example of assigning an arbitrary-but-consistent index to each element on a per key-and-window basis.
>> >
>> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say, PCollection<Row> with Fixed Windows, the state is maintained per window and every element in the window will be assigned a consistent index?
>> > Does this mean every element belonging to the window will be processed in a single DoFn Instance, which otherwise could have been done in multiple parallel instances, limiting performance?
>> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>> >
>> > Thanks,
>> > Rahul

Re: Stateful ParDo on Non-Keyed PCollection

Posted by rahul patwari <ra...@gmail.com>.
So, If an RPC call has to be performed for a batch of
Rows(PCollection<Row>), instead of each Row, the recommended way is to
batch the Rows in startBundle() of DoFn(
https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
I thought Stateful and Timely Processing could be helpful here.

On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com> wrote:

> Though it's not obvious in the name, Stateful ParDos can only be
> applied to keyed PCollections, similar to GroupByKey. (You could,
> however, assign every element to the same key and then apply a
> Stateful DoFn, though in that case all elements would get processed on
> the same worker.)
>
> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
> <ra...@gmail.com> wrote:
> >
> > Hi,
> >
> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives
> an example of assigning an arbitrary-but-consistent index to each element
> on a per key-and-window basis.
> >
> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
> PCollection<Row> with Fixed Windows, the state is maintained per window and
> every element in the window will be assigned a consistent index?
> > Does this mean every element belonging to the window will be processed
> in a single DoFn Instance, which otherwise could have been done in multiple
> parallel instances, limiting performance?
> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
> PCollection?
> >
> > Thanks,
> > Rahul
>

Re: Stateful ParDo on Non-Keyed PCollection

Posted by rahul patwari <ra...@gmail.com>.
So, If an RPC call has to be performed for a batch of
Rows(PCollection<Row>), instead of each Row, the recommended way is to
batch the Rows in startBundle() of DoFn(
https://stackoverflow.com/questions/49094781/yield-results-in-finish-bundle-from-a-custom-dofn/49101711#49101711)?
I thought Stateful and Timely Processing could be helpful here.

On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw <ro...@google.com> wrote:

> Though it's not obvious in the name, Stateful ParDos can only be
> applied to keyed PCollections, similar to GroupByKey. (You could,
> however, assign every element to the same key and then apply a
> Stateful DoFn, though in that case all elements would get processed on
> the same worker.)
>
> On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
> <ra...@gmail.com> wrote:
> >
> > Hi,
> >
> > https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives
> an example of assigning an arbitrary-but-consistent index to each element
> on a per key-and-window basis.
> >
> > If the Stateful ParDo is applied on a Non-Keyed PCollection, say,
> PCollection<Row> with Fixed Windows, the state is maintained per window and
> every element in the window will be assigned a consistent index?
> > Does this mean every element belonging to the window will be processed
> in a single DoFn Instance, which otherwise could have been done in multiple
> parallel instances, limiting performance?
> > Similarly, How does Stateful ParDo behave on Bounded Non-Keyed
> PCollection?
> >
> > Thanks,
> > Rahul
>

Re: Stateful ParDo on Non-Keyed PCollection

Posted by Robert Bradshaw <ro...@google.com>.
Though it's not obvious in the name, Stateful ParDos can only be
applied to keyed PCollections, similar to GroupByKey. (You could,
however, assign every element to the same key and then apply a
Stateful DoFn, though in that case all elements would get processed on
the same worker.)

On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
<ra...@gmail.com> wrote:
>
> Hi,
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an example of assigning an arbitrary-but-consistent index to each element on a per key-and-window basis.
>
> If the Stateful ParDo is applied on a Non-Keyed PCollection, say, PCollection<Row> with Fixed Windows, the state is maintained per window and every element in the window will be assigned a consistent index?
> Does this mean every element belonging to the window will be processed in a single DoFn Instance, which otherwise could have been done in multiple parallel instances, limiting performance?
> Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>
> Thanks,
> Rahul

Re: Stateful ParDo on Non-Keyed PCollection

Posted by Robert Bradshaw <ro...@google.com>.
Though it's not obvious in the name, Stateful ParDos can only be
applied to keyed PCollections, similar to GroupByKey. (You could,
however, assign every element to the same key and then apply a
Stateful DoFn, though in that case all elements would get processed on
the same worker.)

On Thu, Jul 25, 2019 at 6:06 PM rahul patwari
<ra...@gmail.com> wrote:
>
> Hi,
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html  gives an example of assigning an arbitrary-but-consistent index to each element on a per key-and-window basis.
>
> If the Stateful ParDo is applied on a Non-Keyed PCollection, say, PCollection<Row> with Fixed Windows, the state is maintained per window and every element in the window will be assigned a consistent index?
> Does this mean every element belonging to the window will be processed in a single DoFn Instance, which otherwise could have been done in multiple parallel instances, limiting performance?
> Similarly, How does Stateful ParDo behave on Bounded Non-Keyed PCollection?
>
> Thanks,
> Rahul