You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Xinyu Liu <xi...@gmail.com> on 2018/04/26 00:45:02 UTC

Support non-keyed stateful ParDo

Hi,

I am working on adding the stateful ParDo to the upcoming BEAM Samza
runner, and realized that the state for each ParDo processElement() is not
only associated with the window of the element, but also the key of the
element. Chatted with Kenneth over email about this design decision, which
has the following benefits for keyed state:

1) No synchronization
2) Simple programming model
3) No communication between works

The current design doesn't support accessing the state across different
keys, which seems to be a more general use case. This use case is also very
common inside LinkedIn where the users have access to the entire state of
an operator/task, and performing lookups and computations on top of it.
It's quite hard to make every user here aware that the state is also
tightly associated with key of the element.. From the stateful ParDo API
the state looks pretty general too. I am wondering is it possible to extend
the current API to support both keyed and non-keyed state? Even internally
BEAM assigns a dummy key for to associate the state with all the elements.
It will be very beneficial to existing Samza users and help them adopt BEAM.

Thanks,
Xinyu

Re: Support non-keyed stateful ParDo

Posted by Reuven Lax <re...@google.com>.
Do you want execution of a single operator to be distributed across workers
as is the case for Beam? Or do you imagine a single operator existing on a
single worker?

On Wed, Apr 25, 2018 at 6:28 PM Xinyu Liu <xi...@gmail.com> wrote:

> @Robert: for your questions:
>
> 1) Side input won't work for us since it returns the whole collection. We
> use rocksDb and usually the state is too big to fit in memory.
>
> 2) One way to achieve our use cases is to assign a single key to all the
> elements so they will be associated with the same keyed state. The state
> will belong to the element window as it is. Kenneth mentioned this solution
> too. It does meet our use case, but it's not very convenient to our users.
>
> 3) Sorry if I wasn't clear about the use case. For our usage, it's pretty
> common to store the elements in the states, and look them up later and do
> some computation. The elements will be in the same window, but doesn't need
> to be of the same key.
>
> Thanks,
> Xinyu
>
> On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:
>>
>> > Hi,
>>
>> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
>> runner, and realized that the state for each ParDo processElement() is not
>> only associated with the window of the element, but also the key of the
>> element. Chatted with Kenneth over email about this design decision, which
>> has the following benefits for keyed state:
>>
>> > 1) No synchronization
>> > 2) Simple programming model
>> > 3) No communication between works
>>
>> > The current design doesn't support accessing the state across different
>> keys, which seems to be a more general use case. This use case is also
>> very
>> common inside LinkedIn where the users have access to the entire state of
>> an operator/task, and performing lookups and computations on top of it.
>> It's quite hard to make every user here aware that the state is also
>> tightly associated with key of the element..
>>
>> Would side inputs be applicable here? (They're read-only, but other than
>> that seem to fit the need.)
>>
>> >  From the stateful ParDo API the state looks pretty general too. I am
>> wondering is it possible to extend the current API to support both keyed
>> and non-keyed state? Even internally BEAM assigns a dummy key for to
>> associate the state with all the elements. It will be very beneficial to
>> existing Samza users and help them adopt BEAM.
>>
>> Could you clarify how you would use this dummy key? You could manually add
>> a random key, but in that case it's unlikely that any state stored would
>> get observed again. Across what scope were you thinking state would be
>> stored? The lifetime of the bundle? The worker? The job? How are
>> conflicting writes resolved?
>>
>> Perhaps rather than describing the mechanism (state) that you're trying to
>> use, it'd be helpful to describe the kinds of computations you're trying
>> to
>> perform, to figure out how the model should be adapted/extended if it
>> doesn't meet those needs.
>>
>
>

Re: Support non-keyed stateful ParDo

Posted by Robert Bradshaw <ro...@google.com>.
Ideally, the in-memory requirement for side inputs for streaming pipelines
would be lifted (as it is for batch). But in the meantime, a wrapper as
suggested seems like it'd be easy to write for your users.
On Wed, Apr 25, 2018 at 7:07 PM Reuven Lax <re...@google.com> wrote:

> In that case I agree with Ken. It would be trivial to write a wrapper
that did this.

> On Wed, Apr 25, 2018 at 7:01 PM Xinyu Liu <xi...@gmail.com> wrote:

>> @Reuven: if the state is non-keyed (or assigned to a single key), then I
would expect it to be executed in a single worker, otherwise there can be
state corruptions as you mentioned. Our use case is to store elements in
the state regardless of the keys, and then do computations on top of them.
An example can be data lookup: we can store user data elements in the
state, and do a look up of all the relevant user data needed for a incoming
event. This seems to be a quite general use case to me.

>> @Kenneth: it will be great to support it as a convenience composite!

>> Thanks,
>> Xinyu

>> On Wed, Apr 25, 2018 at 6:31 PM, Kenneth Knowles <kl...@google.com> wrote:

>>> #2 could be accomplished with a convenience composite, yes?

>>> On Wed, Apr 25, 2018, 18:28 Xinyu Liu <xi...@gmail.com> wrote:

>>>> @Robert: for your questions:

>>>> 1) Side input won't work for us since it returns the whole collection.
We use rocksDb and usually the state is too big to fit in memory.

>>>> 2) One way to achieve our use cases is to assign a single key to all
the elements so they will be associated with the same keyed state. The
state will belong to the element window as it is. Kenneth mentioned this
solution too. It does meet our use case, but it's not very convenient to
our users.

>>>> 3) Sorry if I wasn't clear about the use case. For our usage, it's
pretty common to store the elements in the states, and look them up later
and do some computation. The elements will be in the same window, but
doesn't need to be of the same key.

>>>> Thanks,
>>>> Xinyu

>>>> On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <ro...@google.com>
wrote:

>>>>> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com>
wrote:

>>>>> > Hi,

>>>>> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
>>>>> runner, and realized that the state for each ParDo processElement()
is not
>>>>> only associated with the window of the element, but also the key of
the
>>>>> element. Chatted with Kenneth over email about this design decision,
which
>>>>> has the following benefits for keyed state:

>>>>> > 1) No synchronization
>>>>> > 2) Simple programming model
>>>>> > 3) No communication between works

>>>>> > The current design doesn't support accessing the state across
different
>>>>> keys, which seems to be a more general use case. This use case is
also very
>>>>> common inside LinkedIn where the users have access to the entire
state of
>>>>> an operator/task, and performing lookups and computations on top of
it.
>>>>> It's quite hard to make every user here aware that the state is also
>>>>> tightly associated with key of the element..

>>>>> Would side inputs be applicable here? (They're read-only, but other
than
>>>>> that seem to fit the need.)

>>>>> >  From the stateful ParDo API the state looks pretty general too. I
am
>>>>> wondering is it possible to extend the current API to support both
keyed
>>>>> and non-keyed state? Even internally BEAM assigns a dummy key for to
>>>>> associate the state with all the elements. It will be very beneficial
to
>>>>> existing Samza users and help them adopt BEAM.

>>>>> Could you clarify how you would use this dummy key? You could
manually add
>>>>> a random key, but in that case it's unlikely that any state stored
would
>>>>> get observed again. Across what scope were you thinking state would be
>>>>> stored? The lifetime of the bundle? The worker? The job? How are
>>>>> conflicting writes resolved?

>>>>> Perhaps rather than describing the mechanism (state) that you're
trying to
>>>>> use, it'd be helpful to describe the kinds of computations you're
trying to
>>>>> perform, to figure out how the model should be adapted/extended if it
>>>>> doesn't meet those needs.

Re: Support non-keyed stateful ParDo

Posted by Reuven Lax <re...@google.com>.
In that case I agree with Ken. It would be trivial to write a wrapper that
did this.

On Wed, Apr 25, 2018 at 7:01 PM Xinyu Liu <xi...@gmail.com> wrote:

> @Reuven: if the state is non-keyed (or assigned to a single key), then I
> would expect it to be executed in a single worker, otherwise there can be
> state corruptions as you mentioned. Our use case is to store elements in
> the state regardless of the keys, and then do computations on top of them.
> An example can be data lookup: we can store user data elements in the
> state, and do a look up of all the relevant user data needed for a incoming
> event. This seems to be a quite general use case to me.
>
> @Kenneth: it will be great to support it as a convenience composite!
>
> Thanks,
> Xinyu
>
> On Wed, Apr 25, 2018 at 6:31 PM, Kenneth Knowles <kl...@google.com> wrote:
>
>> #2 could be accomplished with a convenience composite, yes?
>>
>> On Wed, Apr 25, 2018, 18:28 Xinyu Liu <xi...@gmail.com> wrote:
>>
>>> @Robert: for your questions:
>>>
>>> 1) Side input won't work for us since it returns the whole collection.
>>> We use rocksDb and usually the state is too big to fit in memory.
>>>
>>> 2) One way to achieve our use cases is to assign a single key to all the
>>> elements so they will be associated with the same keyed state. The state
>>> will belong to the element window as it is. Kenneth mentioned this solution
>>> too. It does meet our use case, but it's not very convenient to our users.
>>>
>>> 3) Sorry if I wasn't clear about the use case. For our usage, it's
>>> pretty common to store the elements in the states, and look them up later
>>> and do some computation. The elements will be in the same window, but
>>> doesn't need to be of the same key.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>> On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi,
>>>>
>>>> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
>>>> runner, and realized that the state for each ParDo processElement() is
>>>> not
>>>> only associated with the window of the element, but also the key of the
>>>> element. Chatted with Kenneth over email about this design decision,
>>>> which
>>>> has the following benefits for keyed state:
>>>>
>>>> > 1) No synchronization
>>>> > 2) Simple programming model
>>>> > 3) No communication between works
>>>>
>>>> > The current design doesn't support accessing the state across
>>>> different
>>>> keys, which seems to be a more general use case. This use case is also
>>>> very
>>>> common inside LinkedIn where the users have access to the entire state
>>>> of
>>>> an operator/task, and performing lookups and computations on top of it.
>>>> It's quite hard to make every user here aware that the state is also
>>>> tightly associated with key of the element..
>>>>
>>>> Would side inputs be applicable here? (They're read-only, but other than
>>>> that seem to fit the need.)
>>>>
>>>> >  From the stateful ParDo API the state looks pretty general too. I am
>>>> wondering is it possible to extend the current API to support both keyed
>>>> and non-keyed state? Even internally BEAM assigns a dummy key for to
>>>> associate the state with all the elements. It will be very beneficial to
>>>> existing Samza users and help them adopt BEAM.
>>>>
>>>> Could you clarify how you would use this dummy key? You could manually
>>>> add
>>>> a random key, but in that case it's unlikely that any state stored would
>>>> get observed again. Across what scope were you thinking state would be
>>>> stored? The lifetime of the bundle? The worker? The job? How are
>>>> conflicting writes resolved?
>>>>
>>>> Perhaps rather than describing the mechanism (state) that you're trying
>>>> to
>>>> use, it'd be helpful to describe the kinds of computations you're
>>>> trying to
>>>> perform, to figure out how the model should be adapted/extended if it
>>>> doesn't meet those needs.
>>>>
>>>
>>>
>

Re: Support non-keyed stateful ParDo

Posted by Xinyu Liu <xi...@gmail.com>.
@Reuven: if the state is non-keyed (or assigned to a single key), then I
would expect it to be executed in a single worker, otherwise there can be
state corruptions as you mentioned. Our use case is to store elements in
the state regardless of the keys, and then do computations on top of them.
An example can be data lookup: we can store user data elements in the
state, and do a look up of all the relevant user data needed for a incoming
event. This seems to be a quite general use case to me.

@Kenneth: it will be great to support it as a convenience composite!

Thanks,
Xinyu

On Wed, Apr 25, 2018 at 6:31 PM, Kenneth Knowles <kl...@google.com> wrote:

> #2 could be accomplished with a convenience composite, yes?
>
> On Wed, Apr 25, 2018, 18:28 Xinyu Liu <xi...@gmail.com> wrote:
>
>> @Robert: for your questions:
>>
>> 1) Side input won't work for us since it returns the whole collection. We
>> use rocksDb and usually the state is too big to fit in memory.
>>
>> 2) One way to achieve our use cases is to assign a single key to all the
>> elements so they will be associated with the same keyed state. The state
>> will belong to the element window as it is. Kenneth mentioned this solution
>> too. It does meet our use case, but it's not very convenient to our users.
>>
>> 3) Sorry if I wasn't clear about the use case. For our usage, it's pretty
>> common to store the elements in the states, and look them up later and do
>> some computation. The elements will be in the same window, but doesn't need
>> to be of the same key.
>>
>> Thanks,
>> Xinyu
>>
>> On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>
>>> > Hi,
>>>
>>> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
>>> runner, and realized that the state for each ParDo processElement() is
>>> not
>>> only associated with the window of the element, but also the key of the
>>> element. Chatted with Kenneth over email about this design decision,
>>> which
>>> has the following benefits for keyed state:
>>>
>>> > 1) No synchronization
>>> > 2) Simple programming model
>>> > 3) No communication between works
>>>
>>> > The current design doesn't support accessing the state across different
>>> keys, which seems to be a more general use case. This use case is also
>>> very
>>> common inside LinkedIn where the users have access to the entire state of
>>> an operator/task, and performing lookups and computations on top of it.
>>> It's quite hard to make every user here aware that the state is also
>>> tightly associated with key of the element..
>>>
>>> Would side inputs be applicable here? (They're read-only, but other than
>>> that seem to fit the need.)
>>>
>>> >  From the stateful ParDo API the state looks pretty general too. I am
>>> wondering is it possible to extend the current API to support both keyed
>>> and non-keyed state? Even internally BEAM assigns a dummy key for to
>>> associate the state with all the elements. It will be very beneficial to
>>> existing Samza users and help them adopt BEAM.
>>>
>>> Could you clarify how you would use this dummy key? You could manually
>>> add
>>> a random key, but in that case it's unlikely that any state stored would
>>> get observed again. Across what scope were you thinking state would be
>>> stored? The lifetime of the bundle? The worker? The job? How are
>>> conflicting writes resolved?
>>>
>>> Perhaps rather than describing the mechanism (state) that you're trying
>>> to
>>> use, it'd be helpful to describe the kinds of computations you're trying
>>> to
>>> perform, to figure out how the model should be adapted/extended if it
>>> doesn't meet those needs.
>>>
>>
>>

Re: Support non-keyed stateful ParDo

Posted by Kenneth Knowles <kl...@google.com>.
#2 could be accomplished with a convenience composite, yes?

On Wed, Apr 25, 2018, 18:28 Xinyu Liu <xi...@gmail.com> wrote:

> @Robert: for your questions:
>
> 1) Side input won't work for us since it returns the whole collection. We
> use rocksDb and usually the state is too big to fit in memory.
>
> 2) One way to achieve our use cases is to assign a single key to all the
> elements so they will be associated with the same keyed state. The state
> will belong to the element window as it is. Kenneth mentioned this solution
> too. It does meet our use case, but it's not very convenient to our users.
>
> 3) Sorry if I wasn't clear about the use case. For our usage, it's pretty
> common to store the elements in the states, and look them up later and do
> some computation. The elements will be in the same window, but doesn't need
> to be of the same key.
>
> Thanks,
> Xinyu
>
> On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:
>>
>> > Hi,
>>
>> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
>> runner, and realized that the state for each ParDo processElement() is not
>> only associated with the window of the element, but also the key of the
>> element. Chatted with Kenneth over email about this design decision, which
>> has the following benefits for keyed state:
>>
>> > 1) No synchronization
>> > 2) Simple programming model
>> > 3) No communication between works
>>
>> > The current design doesn't support accessing the state across different
>> keys, which seems to be a more general use case. This use case is also
>> very
>> common inside LinkedIn where the users have access to the entire state of
>> an operator/task, and performing lookups and computations on top of it.
>> It's quite hard to make every user here aware that the state is also
>> tightly associated with key of the element..
>>
>> Would side inputs be applicable here? (They're read-only, but other than
>> that seem to fit the need.)
>>
>> >  From the stateful ParDo API the state looks pretty general too. I am
>> wondering is it possible to extend the current API to support both keyed
>> and non-keyed state? Even internally BEAM assigns a dummy key for to
>> associate the state with all the elements. It will be very beneficial to
>> existing Samza users and help them adopt BEAM.
>>
>> Could you clarify how you would use this dummy key? You could manually add
>> a random key, but in that case it's unlikely that any state stored would
>> get observed again. Across what scope were you thinking state would be
>> stored? The lifetime of the bundle? The worker? The job? How are
>> conflicting writes resolved?
>>
>> Perhaps rather than describing the mechanism (state) that you're trying to
>> use, it'd be helpful to describe the kinds of computations you're trying
>> to
>> perform, to figure out how the model should be adapted/extended if it
>> doesn't meet those needs.
>>
>
>

Re: Support non-keyed stateful ParDo

Posted by Xinyu Liu <xi...@gmail.com>.
@Robert: for your questions:

1) Side input won't work for us since it returns the whole collection. We
use rocksDb and usually the state is too big to fit in memory.

2) One way to achieve our use cases is to assign a single key to all the
elements so they will be associated with the same keyed state. The state
will belong to the element window as it is. Kenneth mentioned this solution
too. It does meet our use case, but it's not very convenient to our users.

3) Sorry if I wasn't clear about the use case. For our usage, it's pretty
common to store the elements in the states, and look them up later and do
some computation. The elements will be in the same window, but doesn't need
to be of the same key.

Thanks,
Xinyu

On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <ro...@google.com>
wrote:

> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:
>
> > Hi,
>
> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
> runner, and realized that the state for each ParDo processElement() is not
> only associated with the window of the element, but also the key of the
> element. Chatted with Kenneth over email about this design decision, which
> has the following benefits for keyed state:
>
> > 1) No synchronization
> > 2) Simple programming model
> > 3) No communication between works
>
> > The current design doesn't support accessing the state across different
> keys, which seems to be a more general use case. This use case is also very
> common inside LinkedIn where the users have access to the entire state of
> an operator/task, and performing lookups and computations on top of it.
> It's quite hard to make every user here aware that the state is also
> tightly associated with key of the element..
>
> Would side inputs be applicable here? (They're read-only, but other than
> that seem to fit the need.)
>
> >  From the stateful ParDo API the state looks pretty general too. I am
> wondering is it possible to extend the current API to support both keyed
> and non-keyed state? Even internally BEAM assigns a dummy key for to
> associate the state with all the elements. It will be very beneficial to
> existing Samza users and help them adopt BEAM.
>
> Could you clarify how you would use this dummy key? You could manually add
> a random key, but in that case it's unlikely that any state stored would
> get observed again. Across what scope were you thinking state would be
> stored? The lifetime of the bundle? The worker? The job? How are
> conflicting writes resolved?
>
> Perhaps rather than describing the mechanism (state) that you're trying to
> use, it'd be helpful to describe the kinds of computations you're trying to
> perform, to figure out how the model should be adapted/extended if it
> doesn't meet those needs.
>

Re: Support non-keyed stateful ParDo

Posted by Kenneth Knowles <kl...@google.com>.
For non-merging windows you can set all data to a single key and it still
allows parallelism over the windows.

Is this how you hope to gain parallelism? If event time is roughly
following real time most windowing won't be very parallel. It makes the
most sense in out of order batch backfill.

The other thing that another thread toyed with was "runner determined
disjoint state". The problem is that is antagonistic toward dynamic
rebalancing (but so is state in general).

Kenn


On Wed, Apr 25, 2018, 18:02 Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:
>
> > Hi,
>
> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
> runner, and realized that the state for each ParDo processElement() is not
> only associated with the window of the element, but also the key of the
> element. Chatted with Kenneth over email about this design decision, which
> has the following benefits for keyed state:
>
> > 1) No synchronization
> > 2) Simple programming model
> > 3) No communication between works
>
> > The current design doesn't support accessing the state across different
> keys, which seems to be a more general use case. This use case is also very
> common inside LinkedIn where the users have access to the entire state of
> an operator/task, and performing lookups and computations on top of it.
> It's quite hard to make every user here aware that the state is also
> tightly associated with key of the element..
>
> Would side inputs be applicable here? (They're read-only, but other than
> that seem to fit the need.)
>
> >  From the stateful ParDo API the state looks pretty general too. I am
> wondering is it possible to extend the current API to support both keyed
> and non-keyed state? Even internally BEAM assigns a dummy key for to
> associate the state with all the elements. It will be very beneficial to
> existing Samza users and help them adopt BEAM.
>
> Could you clarify how you would use this dummy key? You could manually add
> a random key, but in that case it's unlikely that any state stored would
> get observed again. Across what scope were you thinking state would be
> stored? The lifetime of the bundle? The worker? The job? How are
> conflicting writes resolved?
>
> Perhaps rather than describing the mechanism (state) that you're trying to
> use, it'd be helpful to describe the kinds of computations you're trying to
> perform, to figure out how the model should be adapted/extended if it
> doesn't meet those needs.
>

Re: Support non-keyed stateful ParDo

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:

> Hi,

> I am working on adding the stateful ParDo to the upcoming BEAM Samza
runner, and realized that the state for each ParDo processElement() is not
only associated with the window of the element, but also the key of the
element. Chatted with Kenneth over email about this design decision, which
has the following benefits for keyed state:

> 1) No synchronization
> 2) Simple programming model
> 3) No communication between works

> The current design doesn't support accessing the state across different
keys, which seems to be a more general use case. This use case is also very
common inside LinkedIn where the users have access to the entire state of
an operator/task, and performing lookups and computations on top of it.
It's quite hard to make every user here aware that the state is also
tightly associated with key of the element..

Would side inputs be applicable here? (They're read-only, but other than
that seem to fit the need.)

>  From the stateful ParDo API the state looks pretty general too. I am
wondering is it possible to extend the current API to support both keyed
and non-keyed state? Even internally BEAM assigns a dummy key for to
associate the state with all the elements. It will be very beneficial to
existing Samza users and help them adopt BEAM.

Could you clarify how you would use this dummy key? You could manually add
a random key, but in that case it's unlikely that any state stored would
get observed again. Across what scope were you thinking state would be
stored? The lifetime of the bundle? The worker? The job? How are
conflicting writes resolved?

Perhaps rather than describing the mechanism (state) that you're trying to
use, it'd be helpful to describe the kinds of computations you're trying to
perform, to figure out how the model should be adapted/extended if it
doesn't meet those needs.

Re: Support non-keyed stateful ParDo

Posted by Kenneth Knowles <kl...@google.com>.
I think if the concept makes sense we can add it and runners can reject
unsupported pipelines. Starting from the use case is a good way to go.

Kenn

On Wed, Apr 25, 2018, 18:23 Reuven Lax <re...@google.com> wrote:

> A few questions:
>
> Do you want the state to still be associated with the window? In Beam the
> window is per key only. While some window types (e.g. fixed windows) assign
> the same windows to all keys giving the illusion that there a single window
> across keys, in actuality each key has its own separate set of windows.
>
> Given that execution of a ParDo is spread across many workers and every
> worker can read and write state, how would you prevent state from being
> corrupted? Beam runners today ensure that a single key is processed on a
> single worker, so at any point in time there is only one writer.
>
> How do you imagine this to be implemented? The current Beam runners (Spark
> Flink Dataflow) all support per-key state natively. Flink has extremely
> limited support for operator state, but it's only useful in certain cases.
> I'm not sure any of current runners can easily model this.
>
> What exactly are the use cases people are trying to code up? Simply
> porting another system's programming model onto Beam probably won't work
> very well. It would be better to understand what problems people are trying
> to solve, and to understand how to solve those inside the Beam model.
>
> Reuven
>
> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:
>
>> Hi,
>>
>> I am working on adding the stateful ParDo to the upcoming BEAM Samza
>> runner, and realized that the state for each ParDo processElement() is not
>> only associated with the window of the element, but also the key of the
>> element. Chatted with Kenneth over email about this design decision, which
>> has the following benefits for keyed state:
>>
>> 1) No synchronization
>> 2) Simple programming model
>> 3) No communication between works
>>
>> The current design doesn't support accessing the state across different
>> keys, which seems to be a more general use case. This use case is also very
>> common inside LinkedIn where the users have access to the entire state of
>> an operator/task, and performing lookups and computations on top of it.
>> It's quite hard to make every user here aware that the state is also
>> tightly associated with key of the element.. From the stateful ParDo API
>> the state looks pretty general too. I am wondering is it possible to extend
>> the current API to support both keyed and non-keyed state? Even internally
>> BEAM assigns a dummy key for to associate the state with all the elements.
>> It will be very beneficial to existing Samza users and help them adopt BEAM.
>>
>> Thanks,
>> Xinyu
>>
>

Re: Support non-keyed stateful ParDo

Posted by Reuven Lax <re...@google.com>.
A few questions:

Do you want the state to still be associated with the window? In Beam the
window is per key only. While some window types (e.g. fixed windows) assign
the same windows to all keys giving the illusion that there a single window
across keys, in actuality each key has its own separate set of windows.

Given that execution of a ParDo is spread across many workers and every
worker can read and write state, how would you prevent state from being
corrupted? Beam runners today ensure that a single key is processed on a
single worker, so at any point in time there is only one writer.

How do you imagine this to be implemented? The current Beam runners (Spark
Flink Dataflow) all support per-key state natively. Flink has extremely
limited support for operator state, but it's only useful in certain cases.
I'm not sure any of current runners can easily model this.

What exactly are the use cases people are trying to code up? Simply porting
another system's programming model onto Beam probably won't work very well.
It would be better to understand what problems people are trying to solve,
and to understand how to solve those inside the Beam model.

Reuven

On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xi...@gmail.com> wrote:

> Hi,
>
> I am working on adding the stateful ParDo to the upcoming BEAM Samza
> runner, and realized that the state for each ParDo processElement() is not
> only associated with the window of the element, but also the key of the
> element. Chatted with Kenneth over email about this design decision, which
> has the following benefits for keyed state:
>
> 1) No synchronization
> 2) Simple programming model
> 3) No communication between works
>
> The current design doesn't support accessing the state across different
> keys, which seems to be a more general use case. This use case is also very
> common inside LinkedIn where the users have access to the entire state of
> an operator/task, and performing lookups and computations on top of it.
> It's quite hard to make every user here aware that the state is also
> tightly associated with key of the element.. From the stateful ParDo API
> the state looks pretty general too. I am wondering is it possible to extend
> the current API to support both keyed and non-keyed state? Even internally
> BEAM assigns a dummy key for to associate the state with all the elements.
> It will be very beneficial to existing Samza users and help them adopt BEAM.
>
> Thanks,
> Xinyu
>