You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Carlos Alonso <ca...@mrcalonso.com> on 2018/02/12 13:52:43 UTC

ParDo requires its input to use KvCoder in order to use state and timers

I was refactoring my solution a bit and tried to make my stateful transform
to work on simple case classes and I got this exception:
https://pastebin.com/x4xADmvL . I'd like to understand the rationale behind
this as I think carefully choosing the keys would be very important in
order for the work to be properly distributed.

Thanks!

Re: ParDo requires its input to use KvCoder in order to use state and timers

Posted by Kenneth Knowles <kl...@google.com>.
If I am connecting the threads properly, you are trying to simulate
triggering based on the size of buffered data, while you are using the type
of message to route messages to one GCS location or another. Yes?

On Tue, Feb 13, 2018 at 10:29 AM, Carlos Alonso <ca...@mrcalonso.com>
wrote:

> I have a question about this. My scenario is:
>
> * PubSub input with a timestampAttribute named doc_timestamp
> * Fixed windowing of one hour size.
> * Keys are an internal attribute of the messages (the type)
> * Messages of one particular type are way more frequent than the others,
> so it is likely a hot key
>
> Will it help if I add a string representation of the doc_timestamp (to the
> hour) to the key, in order to increase the range of keys and therefore make
> it more parallelisable?
>

It might help, might not. I think it depends on bursty-ness. Technically,
the runner could already parallelize over the windows whenever multiple are
active for a key. No runner actually does this - they all just parallelize
over the key. So embedding the window in the key does make that parallelism
explicit. But for non-overlapping windows if you are streaming data at near
real time and the processing overlaps a lot, you are probably not keeping
up.

I think your best choice is probably following the example of
Combine.withHotKeyFanout to add sub-sharding to your hot key(s) and then
working out a way to write them out to appropriate locations.

Kenn


> I wonder if it will help or not, as the final result will be the same
> (type + window), but not sure if it would help before the point where the
> windowing is applied.
>
> Thanks!
>
>
> On Mon, Feb 12, 2018 at 6:41 PM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Ok, that makes a lot of sense. Thanks Kenneth!
>>
>> On Mon, Feb 12, 2018 at 5:41 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Hi Carlos,
>>>
>>> You are quite correct that choosing the keys is important for work to be
>>> evenly distributed. The reason you need to have a KvCoder is that state is
>>> partitioned per key (to give natural & automatic parallelism) and window
>>> (to allow reclaiming expired state so you can process unbounded data with
>>> bounded storage, and also more parallelism). To a Beam runner, most data in
>>> the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
>>> case where a runner knows the binary layout of encoded data so it can pull
>>> out the keys in order to shuffle data of the same key to the same place, so
>>> that is why it has to be a KvCoder.
>>>
>>> Kenn
>>>
>>> On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> I was refactoring my solution a bit and tried to make my stateful
>>>> transform to work on simple case classes and I got this exception:
>>>> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
>>>> behind this as I think carefully choosing the keys would be very important
>>>> in order for the work to be properly distributed.
>>>>
>>>> Thanks!
>>>>
>>>
>>>

Re: ParDo requires its input to use KvCoder in order to use state and timers

Posted by Carlos Alonso <ca...@mrcalonso.com>.
I have a question about this. My scenario is:

* PubSub input with a timestampAttribute named doc_timestamp
* Fixed windowing of one hour size.
* Keys are an internal attribute of the messages (the type)
* Messages of one particular type are way more frequent than the others, so
it is likely a hot key

Will it help if I add a string representation of the doc_timestamp (to the
hour) to the key, in order to increase the range of keys and therefore make
it more parallelisable?

I wonder if it will help or not, as the final result will be the same (type
+ window), but not sure if it would help before the point where the
windowing is applied.

Thanks!


On Mon, Feb 12, 2018 at 6:41 PM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Ok, that makes a lot of sense. Thanks Kenneth!
>
> On Mon, Feb 12, 2018 at 5:41 PM Kenneth Knowles <kl...@google.com> wrote:
>
>> Hi Carlos,
>>
>> You are quite correct that choosing the keys is important for work to be
>> evenly distributed. The reason you need to have a KvCoder is that state is
>> partitioned per key (to give natural & automatic parallelism) and window
>> (to allow reclaiming expired state so you can process unbounded data with
>> bounded storage, and also more parallelism). To a Beam runner, most data in
>> the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
>> case where a runner knows the binary layout of encoded data so it can pull
>> out the keys in order to shuffle data of the same key to the same place, so
>> that is why it has to be a KvCoder.
>>
>> Kenn
>>
>> On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> I was refactoring my solution a bit and tried to make my stateful
>>> transform to work on simple case classes and I got this exception:
>>> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
>>> behind this as I think carefully choosing the keys would be very important
>>> in order for the work to be properly distributed.
>>>
>>> Thanks!
>>>
>>
>>

Re: ParDo requires its input to use KvCoder in order to use state and timers

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Ok, that makes a lot of sense. Thanks Kenneth!

On Mon, Feb 12, 2018 at 5:41 PM Kenneth Knowles <kl...@google.com> wrote:

> Hi Carlos,
>
> You are quite correct that choosing the keys is important for work to be
> evenly distributed. The reason you need to have a KvCoder is that state is
> partitioned per key (to give natural & automatic parallelism) and window
> (to allow reclaiming expired state so you can process unbounded data with
> bounded storage, and also more parallelism). To a Beam runner, most data in
> the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
> case where a runner knows the binary layout of encoded data so it can pull
> out the keys in order to shuffle data of the same key to the same place, so
> that is why it has to be a KvCoder.
>
> Kenn
>
> On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> I was refactoring my solution a bit and tried to make my stateful
>> transform to work on simple case classes and I got this exception:
>> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
>> behind this as I think carefully choosing the keys would be very important
>> in order for the work to be properly distributed.
>>
>> Thanks!
>>
>
>

Re: ParDo requires its input to use KvCoder in order to use state and timers

Posted by Kenneth Knowles <kl...@google.com>.
Hi Carlos,

You are quite correct that choosing the keys is important for work to be
evenly distributed. The reason you need to have a KvCoder is that state is
partitioned per key (to give natural & automatic parallelism) and window
(to allow reclaiming expired state so you can process unbounded data with
bounded storage, and also more parallelism). To a Beam runner, most data in
the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
case where a runner knows the binary layout of encoded data so it can pull
out the keys in order to shuffle data of the same key to the same place, so
that is why it has to be a KvCoder.

Kenn

On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso <ca...@mrcalonso.com> wrote:

> I was refactoring my solution a bit and tried to make my stateful
> transform to work on simple case classes and I got this exception:
> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
> behind this as I think carefully choosing the keys would be very important
> in order for the work to be properly distributed.
>
> Thanks!
>