You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ryan Madsen <ry...@gmail.com> on 2016/05/24 01:47:55 UTC

Generating a historically-consistent join

Hi all,

I'm looking to solve a problem related to performing a join on two
streaming datasets, and am having a hard time figuring out if Beam provides
a model that can help here. I'm curious if I can create a system that can
take the outputs of two streaming data sources, and run a join on them,
outputting the "joined" values to a new collection. This first part looks
quite easy to do with CoGroupByKey, but there's a twist: if updates are
received out-of-order, I'd like to emit proper updates so the output of the
join will be consistent no matter where I look at in time. That is, the
database doesn't just store the latest view of a collection, but also
stores what that collection looked like for all past values of time.

This is probably unclear, so let me draw out a diagram. Imagine we have 2
input collections: aToB and bToC. If aToB has key aX -> bY @ event time 1,
and bToC has bY -> cZ @ event time 2, my output collection should contain
aX -> cZ @ event time 2. It shows up at eventTime 2 because that's when all
the inputs aligned in time to create an actual join. If we later (in
processing time) receive an update that bY -> cW @ event time 0, then we
should update our output collection to contain both [aX -> cW @ event time
1, and aX -> cZ @ event time 2].  I've included a diagram that runs through
a few update notifications and what the expected outputs should be.

Is this a problem that falls under Beam's problem-domain? Are there any
examples or previous instances of people performing these operations?

Thanks in advance,
Ryan


The expected outputs at each processing time can be seen underneath the
pink "Outputs" bar. Traverse the diagram column-by-column, starting at the
left (processing time 0).

Processing time (pt) pt0 pt1 pt2 pt3 pt4 pt5
Notification (@eventTime) aToB/a1 -> b1 @et1 bToC/b1-> c1 @et2 bToC/b1-> c0
@et0 bToC/b2 -> c3 @et0 bToC/b3 -> c4 @et2 aToB/a1 -> b3 @et1
Event Time @et0: @et0: @et0: @et0: @et0: @et0:
aToB Input Map values aToB aToB aToB aToB aToB aToB
bToC Input Map values bToC bToC bToC bToC bToC bToC
b1 -> c0 b1->c0 b1->c0 b1->c0
b2->c3 b2->c3 b2->c3
Outputs aToC aToC aToC aToC aToC aToC
@et1: @et1: @et1: @et1: @et1: @et1:
aToB Input Map values aToB aToB aToB aToB aToB aToB
a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3
bToC Input Map values bToC bToC bToC bToC bToC bToC
b1->c0 b1->c0 b1->c0 b1->c0
b2->c3 b2->c3 b2->c3
Outputs aToC aToC aToC aToC aToC aToC
a1->c0 a1->c0 a1->c0
@et2: @et2: @et2: @et2: @et2: @et2:
aToB Input Map values aToB aToB aToB aToB aToB aToB
a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3
bToC Input Map values bToC bToC bToC bToC bToC bToC
b1->c1 b1->c1 b1->c1 b1->c1 b1->c1
b2->c3 b2->c3 b2->c3
b3->c4 b3->c4
Outputs aToC aToC aToC aToC aToC aToC
a1->c1 a1->c1 a1->c1 a1->c1 a1->c4
Key:
New input state New input state@later time New derived state Retraction
pt=Processing Time et=EventTime

Re: Generating a historically-consistent join

Posted by Mark Shields <ma...@google.com>.
Hi Ryan, perhaps this is https://issues.apache.org/jira/browse/BEAM-197 ?



On Mon, May 23, 2016 at 6:47 PM, Ryan Madsen <ry...@gmail.com> wrote:

> Hi all,
>
> I'm looking to solve a problem related to performing a join on two
> streaming datasets, and am having a hard time figuring out if Beam provides
> a model that can help here. I'm curious if I can create a system that can
> take the outputs of two streaming data sources, and run a join on them,
> outputting the "joined" values to a new collection. This first part looks
> quite easy to do with CoGroupByKey, but there's a twist: if updates are
> received out-of-order, I'd like to emit proper updates so the output of the
> join will be consistent no matter where I look at in time. That is, the
> database doesn't just store the latest view of a collection, but also
> stores what that collection looked like for all past values of time.
>
> This is probably unclear, so let me draw out a diagram. Imagine we have 2
> input collections: aToB and bToC. If aToB has key aX -> bY @ event time 1,
> and bToC has bY -> cZ @ event time 2, my output collection should contain
> aX -> cZ @ event time 2. It shows up at eventTime 2 because that's when all
> the inputs aligned in time to create an actual join. If we later (in
> processing time) receive an update that bY -> cW @ event time 0, then we
> should update our output collection to contain both [aX -> cW @ event time
> 1, and aX -> cZ @ event time 2].  I've included a diagram that runs through
> a few update notifications and what the expected outputs should be.
>
> Is this a problem that falls under Beam's problem-domain? Are there any
> examples or previous instances of people performing these operations?
>
> Thanks in advance,
> Ryan
>
>
> The expected outputs at each processing time can be seen underneath the
> pink "Outputs" bar. Traverse the diagram column-by-column, starting at the
> left (processing time 0).
>
> Processing time (pt) pt0 pt1 pt2 pt3 pt4 pt5
> Notification (@eventTime) aToB/a1 -> b1 @et1 bToC/b1-> c1 @et2 bToC/b1->
> c0 @et0 bToC/b2 -> c3 @et0 bToC/b3 -> c4 @et2 aToB/a1 -> b3 @et1
> Event Time @et0: @et0: @et0: @et0: @et0: @et0:
> aToB Input Map values aToB aToB aToB aToB aToB aToB
> bToC Input Map values bToC bToC bToC bToC bToC bToC
> b1 -> c0 b1->c0 b1->c0 b1->c0
> b2->c3 b2->c3 b2->c3
> Outputs aToC aToC aToC aToC aToC aToC
> @et1: @et1: @et1: @et1: @et1: @et1:
> aToB Input Map values aToB aToB aToB aToB aToB aToB
> a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3
> bToC Input Map values bToC bToC bToC bToC bToC bToC
> b1->c0 b1->c0 b1->c0 b1->c0
> b2->c3 b2->c3 b2->c3
> Outputs aToC aToC aToC aToC aToC aToC
> a1->c0 a1->c0 a1->c0
> @et2: @et2: @et2: @et2: @et2: @et2:
> aToB Input Map values aToB aToB aToB aToB aToB aToB
> a1->b1 a1->b1 a1->b1 a1->b1 a1->b1 a1->b3
> bToC Input Map values bToC bToC bToC bToC bToC bToC
> b1->c1 b1->c1 b1->c1 b1->c1 b1->c1
> b2->c3 b2->c3 b2->c3
> b3->c4 b3->c4
> Outputs aToC aToC aToC aToC aToC aToC
> a1->c1 a1->c1 a1->c1 a1->c1 a1->c4
> Key:
> New input state New input state@later time New derived state Retraction
> pt=Processing Time et=EventTime
>
>
>
>
>