You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Niels Basjes <Ni...@basjes.nl> on 2018/07/06 15:01:04 UTC

Routing events by key

Hi,

I have an unbounded stream of change events each of which has the id of the
entity that is changed.
To avoid the need for locking in the persistence layer that is needed in
part of my processing I want to route all events based on this entity id.
That way I know for sure that all events around a single entity go through
the same instance of my processing sequentially, hence no need for locking
or other synchronization regarding this persistence.

At this point my best guess is that I need to use the GroupByKey but that
seems to need a Window.
I think I don't want a window because the stream is unbounded and I want
the lowest possible latency (i.e. a Window of 1 second would be ok for this
usecase).
Also I want to be 100% sure that all events for a specific id go to only a
single instance because I do not want any race conditions.

My simple question is: What does that look like in the Beam Java API?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Routing events by key

Posted by Robert Bradshaw <ro...@google.com>.
Perhaps what you're looking for is WithKeys [1]? Any PCollection<KV<.,.>>
is essentially a KeyedDataStream. GroupByKey colocates the values for a
given key (and window). What isn't promised by Apache Beam is that
successive (key, value) pairs with the same key must be processed
sequentially, on the same machine. In practice, all runners do so after a
GBK. (Note that in the face of worker failure, no runner can 100% provide
this guarantee.)

[1]
https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/WithKeys.html


On Sun, Jul 8, 2018 at 9:08 AM Niels Basjes <Ni...@basjes.nl> wrote:

> Hi all,
>
> Thanks for the help.
>
> I did some additional reading and it looks like what I want to have is
> what in Apache Flink is called a KeyedDataStream that is created by
> applying the KeyBy opration on a DataStream (
> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
>  ).
> It seems that this is the feature I'm looking for in Beam.
>
> Just to check if I understand it correctly
> 1) the KeyedDataStream in Flink IS the same as what I am looking for in
> Beam?
> 2) in the current version of Beam such a concept does not yet exist out of
> the box. Yet this can be built using the construct Robert described?
>
> Just to get my goal clear: I have change events coming in from various
> sources, I want to avoid the need for database side locking and as such I
> want to route all events for a specific entity into a single threaded
> stream (i.e. a single mapper instance).
>
> So what I understand right now to make it work in Beam:
> - I should be able to use the Session window using my entity id as the
> session id.
> - and then do a window operation that "does the work"
>
> Because I'm interested in avoiding database locks I have to make sure I
> use the processing time to base these windows upon (not the event time).
>
> Do you guys think this would work for my use case?
>
>
> Towards the future;
> To me this "KeyBy" operation seems like a very fundamental operation on a
> stream that I would expect to find in any stream processing tool.
> Can this be added to Beam and then see if the runners can support it?
>
> Niels Basjes
>
> On Sun, Jul 8, 2018 at 1:43 AM, Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Reshuffle for the purpose of ensuring stable inputs is deprecated, but
>> this seems a valid(ish) usecase.
>>
>> Currently, all runners implement GroupByKey by sending everything with
>> the same key to the same machine, however this is not guaranteed by the
>> Beam model, and changing it has been tossed around (e.g. when using fixed
>> windows, one could send different key-window pairs to different machines).
>> Even when this is the case, there's no guarantee that there won't be backup
>> workers processing the same key, or even if the runner doesn't do backups,
>> zombie workers (e.g. where we thought the worker died, and allocated its
>> work elsewhere, but it turns out the worker is till churning away possibly
>> causing side effects). Such is the nature of a distributed system.
>>
>> If you go the GBK route, rather than windowing by 1 second, you could us
>> an AfterPane.elementCountAtLeast(1) trigger [1] (even in the global window)
>> for absolute minimal latency.
>> https://beam.apache.org/documentation/programming-guide/#data-driven-triggers
>> . This is essentially what Reshuffle does.
>>
>> - Robert
>>
>>
>> On Fri, Jul 6, 2018 at 11:11 PM Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>>> Hi Raghu,
>>>
>>> AFAIR, Reshuffle is considered as deprecated. Maybe it would be better
>>> to avoid to use it no ?
>>>
>>> Regards
>>> JB
>>>
>>> On 06/07/2018 18:28, Raghu Angadi wrote:
>>> > I would use Reshuffle()[1] with entity id as the key. It internally
>>> does
>>> > a GroupByKey and sets up windowing such that it does not buffer
>>> anything.
>>> >
>>> > [1]
>>> > :
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51
>>>
>>> >
>>> > On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Niels@basjes.nl
>>> > <ma...@basjes.nl>> wrote:
>>> >
>>> >     Hi,
>>> >
>>> >     I have an unbounded stream of change events each of which has the
>>> id
>>> >     of the entity that is changed.
>>> >     To avoid the need for locking in the persistence layer that is
>>> >     needed in part of my processing I want to route all events based on
>>> >     this entity id.
>>> >     That way I know for sure that all events around a single entity go
>>> >     through the same instance of my processing sequentially, hence no
>>> >     need for locking or other synchronization regarding this
>>> persistence.
>>> >
>>> >     At this point my best guess is that I need to use the GroupByKey
>>> but
>>> >     that seems to need a Window.
>>> >     I think I don't want a window because the stream is unbounded and I
>>> >     want the lowest possible latency (i.e. a Window of 1 second would
>>> be
>>> >     ok for this usecase).
>>> >     Also I want to be 100% sure that all events for a specific id go to
>>> >     only a single instance because I do not want any race conditions.
>>> >
>>> >     My simple question is: What does that look like in the Beam Java
>>> API?
>>> >
>>> >     --
>>> >     Best regards / Met vriendelijke groeten,
>>> >
>>> >     Niels Basjes
>>> >
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Routing events by key

Posted by Niels Basjes <Ni...@basjes.nl>.
Hi all,

Thanks for the help.

I did some additional reading and it looks like what I want to have is what
in Apache Flink is called a KeyedDataStream that is created by applying the
KeyBy opration on a DataStream (
https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams
 ).
It seems that this is the feature I'm looking for in Beam.

Just to check if I understand it correctly
1) the KeyedDataStream in Flink IS the same as what I am looking for in
Beam?
2) in the current version of Beam such a concept does not yet exist out of
the box. Yet this can be built using the construct Robert described?

Just to get my goal clear: I have change events coming in from various
sources, I want to avoid the need for database side locking and as such I
want to route all events for a specific entity into a single threaded
stream (i.e. a single mapper instance).

So what I understand right now to make it work in Beam:
- I should be able to use the Session window using my entity id as the
session id.
- and then do a window operation that "does the work"

Because I'm interested in avoiding database locks I have to make sure I use
the processing time to base these windows upon (not the event time).

Do you guys think this would work for my use case?


Towards the future;
To me this "KeyBy" operation seems like a very fundamental operation on a
stream that I would expect to find in any stream processing tool.
Can this be added to Beam and then see if the runners can support it?

Niels Basjes

On Sun, Jul 8, 2018 at 1:43 AM, Robert Bradshaw <ro...@google.com> wrote:

> Reshuffle for the purpose of ensuring stable inputs is deprecated, but
> this seems a valid(ish) usecase.
>
> Currently, all runners implement GroupByKey by sending everything with the
> same key to the same machine, however this is not guaranteed by the Beam
> model, and changing it has been tossed around (e.g. when using fixed
> windows, one could send different key-window pairs to different machines).
> Even when this is the case, there's no guarantee that there won't be backup
> workers processing the same key, or even if the runner doesn't do backups,
> zombie workers (e.g. where we thought the worker died, and allocated its
> work elsewhere, but it turns out the worker is till churning away possibly
> causing side effects). Such is the nature of a distributed system.
>
> If you go the GBK route, rather than windowing by 1 second, you could us
> an AfterPane.elementCountAtLeast(1) trigger [1] (even in the global
> window) for absolute minimal latency. https://beam.apache.org/
> documentation/programming-guide/#data-driven-triggers . This is
> essentially what Reshuffle does.
>
> - Robert
>
>
> On Fri, Jul 6, 2018 at 11:11 PM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>> Hi Raghu,
>>
>> AFAIR, Reshuffle is considered as deprecated. Maybe it would be better
>> to avoid to use it no ?
>>
>> Regards
>> JB
>>
>> On 06/07/2018 18:28, Raghu Angadi wrote:
>> > I would use Reshuffle()[1] with entity id as the key. It internally does
>> > a GroupByKey and sets up windowing such that it does not buffer
>> anything.
>> >
>> > [1]
>> > : https://github.com/apache/beam/blob/master/sdks/java/
>> core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51
>> >
>> > On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Niels@basjes.nl
>> > <ma...@basjes.nl>> wrote:
>> >
>> >     Hi,
>> >
>> >     I have an unbounded stream of change events each of which has the id
>> >     of the entity that is changed.
>> >     To avoid the need for locking in the persistence layer that is
>> >     needed in part of my processing I want to route all events based on
>> >     this entity id.
>> >     That way I know for sure that all events around a single entity go
>> >     through the same instance of my processing sequentially, hence no
>> >     need for locking or other synchronization regarding this
>> persistence.
>> >
>> >     At this point my best guess is that I need to use the GroupByKey but
>> >     that seems to need a Window.
>> >     I think I don't want a window because the stream is unbounded and I
>> >     want the lowest possible latency (i.e. a Window of 1 second would be
>> >     ok for this usecase).
>> >     Also I want to be 100% sure that all events for a specific id go to
>> >     only a single instance because I do not want any race conditions.
>> >
>> >     My simple question is: What does that look like in the Beam Java
>> API?
>> >
>> >     --
>> >     Best regards / Met vriendelijke groeten,
>> >
>> >     Niels Basjes
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Routing events by key

Posted by Robert Bradshaw <ro...@google.com>.
Reshuffle for the purpose of ensuring stable inputs is deprecated, but this
seems a valid(ish) usecase.

Currently, all runners implement GroupByKey by sending everything with the
same key to the same machine, however this is not guaranteed by the Beam
model, and changing it has been tossed around (e.g. when using fixed
windows, one could send different key-window pairs to different machines).
Even when this is the case, there's no guarantee that there won't be backup
workers processing the same key, or even if the runner doesn't do backups,
zombie workers (e.g. where we thought the worker died, and allocated its
work elsewhere, but it turns out the worker is till churning away possibly
causing side effects). Such is the nature of a distributed system.

If you go the GBK route, rather than windowing by 1 second, you could us
an AfterPane.elementCountAtLeast(1) trigger [1] (even in the global window)
for absolute minimal latency.
https://beam.apache.org/documentation/programming-guide/#data-driven-triggers
. This is essentially what Reshuffle does.

- Robert


On Fri, Jul 6, 2018 at 11:11 PM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Raghu,
>
> AFAIR, Reshuffle is considered as deprecated. Maybe it would be better
> to avoid to use it no ?
>
> Regards
> JB
>
> On 06/07/2018 18:28, Raghu Angadi wrote:
> > I would use Reshuffle()[1] with entity id as the key. It internally does
> > a GroupByKey and sets up windowing such that it does not buffer anything.
> >
> > [1]
> > :
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51
>
> >
> > On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Niels@basjes.nl
> > <ma...@basjes.nl>> wrote:
> >
> >     Hi,
> >
> >     I have an unbounded stream of change events each of which has the id
> >     of the entity that is changed.
> >     To avoid the need for locking in the persistence layer that is
> >     needed in part of my processing I want to route all events based on
> >     this entity id.
> >     That way I know for sure that all events around a single entity go
> >     through the same instance of my processing sequentially, hence no
> >     need for locking or other synchronization regarding this persistence.
> >
> >     At this point my best guess is that I need to use the GroupByKey but
> >     that seems to need a Window.
> >     I think I don't want a window because the stream is unbounded and I
> >     want the lowest possible latency (i.e. a Window of 1 second would be
> >     ok for this usecase).
> >     Also I want to be 100% sure that all events for a specific id go to
> >     only a single instance because I do not want any race conditions.
> >
> >     My simple question is: What does that look like in the Beam Java API?
> >
> >     --
> >     Best regards / Met vriendelijke groeten,
> >
> >     Niels Basjes
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Routing events by key

Posted by Raghu Angadi <ra...@google.com>.
JB, reshuffle deprecation is discussed mutliple times, most recent one is
from May :
https://lists.apache.org/thread.html/820064a81c86a6d44f21f0d6c68ea3f46cec151e5e1a0b52eeed3fbf@%3Cdev.beam.apache.org%3E
I filed https://issues.apache.org/jira/browse/BEAM-4372. Summary: It can be
used, and perhaps needs a rename. Just mentioned the thread here as a
reference, further discussion could be directed to the above dev thread or
the jira.

Thanks Robert for more detailed description.

Raghu.

On Fri, Jul 6, 2018 at 11:11 PM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Raghu,
>
> AFAIR, Reshuffle is considered as deprecated. Maybe it would be better
> to avoid to use it no ?
>
> Regards
> JB
>
> On 06/07/2018 18:28, Raghu Angadi wrote:
> > I would use Reshuffle()[1] with entity id as the key. It internally does
> > a GroupByKey and sets up windowing such that it does not buffer anything.
> >
> > [1]
> > :
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51
>
> >
> > On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Niels@basjes.nl
> > <ma...@basjes.nl>> wrote:
> >
> >     Hi,
> >
> >     I have an unbounded stream of change events each of which has the id
> >     of the entity that is changed.
> >     To avoid the need for locking in the persistence layer that is
> >     needed in part of my processing I want to route all events based on
> >     this entity id.
> >     That way I know for sure that all events around a single entity go
> >     through the same instance of my processing sequentially, hence no
> >     need for locking or other synchronization regarding this persistence.
> >
> >     At this point my best guess is that I need to use the GroupByKey but
> >     that seems to need a Window.
> >     I think I don't want a window because the stream is unbounded and I
> >     want the lowest possible latency (i.e. a Window of 1 second would be
> >     ok for this usecase).
> >     Also I want to be 100% sure that all events for a specific id go to
> >     only a single instance because I do not want any race conditions.
> >
> >     My simple question is: What does that look like in the Beam Java API?
> >
> >     --
> >     Best regards / Met vriendelijke groeten,
> >
> >     Niels Basjes
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Routing events by key

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Raghu,

AFAIR, Reshuffle is considered as deprecated. Maybe it would be better
to avoid to use it no ?

Regards
JB

On 06/07/2018 18:28, Raghu Angadi wrote:
> I would use Reshuffle()[1] with entity id as the key. It internally does
> a GroupByKey and sets up windowing such that it does not buffer anything.
> 
> [1]
> : https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51 
> 
> On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Niels@basjes.nl
> <ma...@basjes.nl>> wrote:
> 
>     Hi,
> 
>     I have an unbounded stream of change events each of which has the id
>     of the entity that is changed.
>     To avoid the need for locking in the persistence layer that is
>     needed in part of my processing I want to route all events based on
>     this entity id.
>     That way I know for sure that all events around a single entity go
>     through the same instance of my processing sequentially, hence no
>     need for locking or other synchronization regarding this persistence.
> 
>     At this point my best guess is that I need to use the GroupByKey but
>     that seems to need a Window. 
>     I think I don't want a window because the stream is unbounded and I
>     want the lowest possible latency (i.e. a Window of 1 second would be
>     ok for this usecase).
>     Also I want to be 100% sure that all events for a specific id go to
>     only a single instance because I do not want any race conditions.
> 
>     My simple question is: What does that look like in the Beam Java API?
> 
>     -- 
>     Best regards / Met vriendelijke groeten,
> 
>     Niels Basjes
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Routing events by key

Posted by Raghu Angadi <ra...@google.com>.
I would use Reshuffle()[1] with entity id as the key. It internally does a
GroupByKey and sets up windowing such that it does not buffer anything.

[1] :
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L51


On Fri, Jul 6, 2018 at 8:01 AM Niels Basjes <Ni...@basjes.nl> wrote:

> Hi,
>
> I have an unbounded stream of change events each of which has the id of
> the entity that is changed.
> To avoid the need for locking in the persistence layer that is needed in
> part of my processing I want to route all events based on this entity id.
> That way I know for sure that all events around a single entity go through
> the same instance of my processing sequentially, hence no need for locking
> or other synchronization regarding this persistence.
>
> At this point my best guess is that I need to use the GroupByKey but that
> seems to need a Window.
> I think I don't want a window because the stream is unbounded and I want
> the lowest possible latency (i.e. a Window of 1 second would be ok for this
> usecase).
> Also I want to be 100% sure that all events for a specific id go to only a
> single instance because I do not want any race conditions.
>
> My simple question is: What does that look like in the Beam Java API?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Routing events by key

Posted by Lukasz Cwik <lc...@google.com>.
Note that even if you use GroupByKey and a 1 second window, it could be
that key K at time T1 and T2 are scheduled to be processed in parallel
which means that you will still need locking.

Apache Beam has no transform which allows you to partition the data how you
want without using synchronization/locking/... unless your underlying
storage engine allows you to pass in user specified version numbers which
then you could use the windowing information to produce larger and larger
version numbers so the storage engine would know which write it should keep
and which write it should discard.

Alternatively, if you know which runner you want to use, it may be that
intrinsically via some execution properties of the runner you ca get what
you need but you'll have a pipeline which isn't following strict Apache
Beam semantics and if the runner was to change, it may break you.

Finally, if none of that works out, you'll want to use a stream processing
engine that allows you to specifically say that any key range should only
ever be processed on a single machine at a time. This can have lots of its
own problems if you hit a hot key since one machine will be swamped
processing while the others are relatively idle.


On Fri, Jul 6, 2018 at 8:13 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:

> Hi Niels,
>
> as you have an Unbounded PCollection, you need a Window to GroupByKey
> and then "forward" the data.
>
> Another option would be to use a DoFn working element per element and
> eventually batching then. It's what most of the IOs are doing for the
> Write part.
>
> Regards
> JB
>
> On 06/07/2018 17:01, Niels Basjes wrote:
> > Hi,
> >
> > I have an unbounded stream of change events each of which has the id of
> > the entity that is changed.
> > To avoid the need for locking in the persistence layer that is needed in
> > part of my processing I want to route all events based on this entity id.
> > That way I know for sure that all events around a single entity go
> > through the same instance of my processing sequentially, hence no need
> > for locking or other synchronization regarding this persistence.
> >
> > At this point my best guess is that I need to use the GroupByKey but
> > that seems to need a Window.
> > I think I don't want a window because the stream is unbounded and I want
> > the lowest possible latency (i.e. a Window of 1 second would be ok for
> > this usecase).
> > Also I want to be 100% sure that all events for a specific id go to only
> > a single instance because I do not want any race conditions.
> >
> > My simple question is: What does that look like in the Beam Java API?
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Routing events by key

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Niels,

as you have an Unbounded PCollection, you need a Window to GroupByKey
and then "forward" the data.

Another option would be to use a DoFn working element per element and
eventually batching then. It's what most of the IOs are doing for the
Write part.

Regards
JB

On 06/07/2018 17:01, Niels Basjes wrote:
> Hi,
> 
> I have an unbounded stream of change events each of which has the id of
> the entity that is changed.
> To avoid the need for locking in the persistence layer that is needed in
> part of my processing I want to route all events based on this entity id.
> That way I know for sure that all events around a single entity go
> through the same instance of my processing sequentially, hence no need
> for locking or other synchronization regarding this persistence.
> 
> At this point my best guess is that I need to use the GroupByKey but
> that seems to need a Window. 
> I think I don't want a window because the stream is unbounded and I want
> the lowest possible latency (i.e. a Window of 1 second would be ok for
> this usecase).
> Also I want to be 100% sure that all events for a specific id go to only
> a single instance because I do not want any race conditions.
> 
> My simple question is: What does that look like in the Beam Java API?
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com