You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2020/05/01 15:30:11 UTC

Re: Non-trivial joins examples

+dev <de...@beam.apache.org>@beam and some people who I talk about joins with

Interesting! It is a lot to take in and fully grok the code, so calling in
reinforcements...

Generally, I think there's agreement that for a lot of real use cases, you
have to roll your own join using the lower level Beam primitives. So I
think it would be great to get some of these other approaches to joins into
Beam, perhaps as an extension of the Java SDK or even in the core (since
schema joins are in the core). In particular:

 - "join in fixed window with repeater" sounds similar (but not identical)
to work by Mikhail
 - "join in global window with cache" sounds similar (but not identical) to
work and discussions w/ Reza and Tyson

I want to be clear that I am *not* saying there's any duplication. I'm
guessing these all fit into a collection of different ways to accomplish
joins, and if everything comes to fruition we will have the great
opportunity to document how a user should choose between them.

Kenn

On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <ma...@gmail.com>
wrote:

> Hi,
>
> it's my first post here but I'm a group reader for a while, so thank you
> for sharing the knowledge!
>
> I've been using Beam/Scio on Dataflow for about a year, mostly for stream
> processing from unbounded source like PubSub. During my daily work I found
> that built-in windowing is very generic and provides reach watermark/late
> events semantics but there are a few very annoying limitations, e.g:
> - both side of the join must be defined within compatible windows
> - for fixed windows, elements close to window boundaries (but in different
> windows) won't be joined
> - for sliding windows there is a huge overhead if the duration is much
> longer than offset
>
> I would like to ask you to review a few "join/windowing patterns" with
> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
> crafted for more specific needs. I published code with tests, feel free to
> comment as GitHub issues or on the mailing list. The event time processing
> with watermarks is so demanding that I'm almost sure that I overlooked many
> important corner cases.
> https://github.com/mkuthan/beam-examples
>
> If you think that the examples are somehow useful I'll be glad to write
> blog post with more details :)
>
> Marcin
>

Re: Non-trivial joins examples

Posted by Reza Ardeshir Rokni <ra...@gmail.com>.
A couple of things that are really nice here,

1- Domain specific (CTR in your example). We may find that eventually it's
not possible / practical to build out generic joins for all situations. But
with the primitives available in Beam and good 'patterns' domain specific
joins could be added for different industries.

2- Pros / Cons section. This is very nice and as Kenn mentioned it would be
great for there to be a Collection of joins that users can choose from
based on the pros / cons.

I got pulled onto other work before I could complete this PR (LINK
<https://github.com/apache/beam/pull/9032>) for example, but I hope to go
back to it, it's specific to a time series use case from a specific
industry with pros and cons based on throughput etc....

Maybe we should consider adding something with links etc to Beam
patterns...

https://beam.apache.org/documentation/patterns/overview/

Perhaps a Joins section and we do something that has not been done before
and add a Industry / Domain flavour..

Cheers

Reza

On Sat, 2 May 2020 at 14:45, Marcin Kuthan <ma...@gmail.com> wrote:

> @Kenneth - thank for your response, for sure I was inspired a lot by
> earlier discussions on the group and latest documentation updates about
> Timers:
> https://beam.apache.org/documentation/programming-guide/#timers
>
> In the limitations I forgot to mention about SideInputs, it works quite
> well for scenarios where one side of the join is updated slowly, very
> slowly. But for scenarios where the main stream gets 50k+ events per
> seconds and the joined stream ~100 events per second it simply does not
> work. Especially if there is no support for updates in Map side input and
> the side input has to be updated/broadcasted as a whole.
>
> @Jan - very interesting, as I understood the joins are already implemented
> (plenty of them in Scio, classic ones, sparse versions, etc.) the problem
> is with limited windows semantics, triggering policy and the time of
> emitted events.
>
> Please look at LookupCacheDoFn, it looks like left outer join - but it
> isn't. Only the latest Lookup value (right side of the join) is cached. And
> the left side of the join is cached only until the first matching lookup is
> observed. Not so generic but quite efficient.
>
>
> https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala
>
> Marcin
>
> On Fri, 1 May 2020 at 22:22, Jan Lukavský <je...@seznam.cz> wrote:
>
>> Interestingly, I'm currently also working on a proposal for generic join
>> semantics. I plan to send a proposal for review, but unfortunately, there
>> are still other things keeping me busy. I take this opportunity to review
>> high-level thoughts, maybe someone can give some points.
>>
>> The general idea is to define a join that can incorporate all other types
>> as special cases, where the generic implementation can be simplified or
>> optimized, but the semantics remain the same. As I plan to put this down to
>> a full design document I will just very roughly outline ideas:
>>
>>  a) the generic semantics, should be equivalent to running relational
>> join against set of tables _after each individual modification of the
>> relation_ and producing results with timestamp of the last modification
>>
>>  b) windows "scope" state of each "table" - i.e. when time reaches
>> window.maxTimestamp() corresponding "table" is cleared
>>
>>  c) it should be possible to derive other types of joins from this
>> definition by certain manipulations (e.g. buffering multiple updates in
>> single window and assigninig all elements timestamp of
>> window.maxTimestamp() will yield the classical "windowed join" with the
>> requirement to have same windows on both (all) sides as otherwise the
>> result will be empty) - the goal of these modification is typically
>> enabling some optimization (e.g. the fully generic implementation must
>> include time sorting - either implicitly or explicitly, optimized variants
>> can drop this requirement).
>>
>> It would be great is someone has any comments on this bottom-up approach.
>>
>> Jan
>> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>>
>> +dev <de...@beam.apache.org>@beam and some people who I talk about joins
>> with
>>
>> Interesting! It is a lot to take in and fully grok the code, so calling
>> in reinforcements...
>>
>> Generally, I think there's agreement that for a lot of real use cases,
>> you have to roll your own join using the lower level Beam primitives. So I
>> think it would be great to get some of these other approaches to joins into
>> Beam, perhaps as an extension of the Java SDK or even in the core (since
>> schema joins are in the core). In particular:
>>
>>  - "join in fixed window with repeater" sounds similar (but not
>> identical) to work by Mikhail
>>  - "join in global window with cache" sounds similar (but not identical)
>> to work and discussions w/ Reza and Tyson
>>
>> I want to be clear that I am *not* saying there's any duplication. I'm
>> guessing these all fit into a collection of different ways to accomplish
>> joins, and if everything comes to fruition we will have the great
>> opportunity to document how a user should choose between them.
>>
>> Kenn
>>
>> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <ma...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> it's my first post here but I'm a group reader for a while, so thank you
>>> for sharing the knowledge!
>>>
>>> I've been using Beam/Scio on Dataflow for about a year, mostly for
>>> stream processing from unbounded source like PubSub. During my daily work I
>>> found that built-in windowing is very generic and provides reach
>>> watermark/late events semantics but there are a few very annoying
>>> limitations, e.g:
>>> - both side of the join must be defined within compatible windows
>>> - for fixed windows, elements close to window boundaries (but in
>>> different windows) won't be joined
>>> - for sliding windows there is a huge overhead if the duration is much
>>> longer than offset
>>>
>>> I would like to ask you to review a few "join/windowing patterns" with
>>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
>>> crafted for more specific needs. I published code with tests, feel free to
>>> comment as GitHub issues or on the mailing list. The event time processing
>>> with watermarks is so demanding that I'm almost sure that I overlooked many
>>> important corner cases.
>>> https://github.com/mkuthan/beam-examples
>>>
>>> If you think that the examples are somehow useful I'll be glad to write
>>> blog post with more details :)
>>>
>>> Marcin
>>>
>>

Re: Non-trivial joins examples

Posted by Reza Ardeshir Rokni <ra...@gmail.com>.
A couple of things that are really nice here,

1- Domain specific (CTR in your example). We may find that eventually it's
not possible / practical to build out generic joins for all situations. But
with the primitives available in Beam and good 'patterns' domain specific
joins could be added for different industries.

2- Pros / Cons section. This is very nice and as Kenn mentioned it would be
great for there to be a Collection of joins that users can choose from
based on the pros / cons.

I got pulled onto other work before I could complete this PR (LINK
<https://github.com/apache/beam/pull/9032>) for example, but I hope to go
back to it, it's specific to a time series use case from a specific
industry with pros and cons based on throughput etc....

Maybe we should consider adding something with links etc to Beam
patterns...

https://beam.apache.org/documentation/patterns/overview/

Perhaps a Joins section and we do something that has not been done before
and add a Industry / Domain flavour..

Cheers

Reza

On Sat, 2 May 2020 at 14:45, Marcin Kuthan <ma...@gmail.com> wrote:

> @Kenneth - thank for your response, for sure I was inspired a lot by
> earlier discussions on the group and latest documentation updates about
> Timers:
> https://beam.apache.org/documentation/programming-guide/#timers
>
> In the limitations I forgot to mention about SideInputs, it works quite
> well for scenarios where one side of the join is updated slowly, very
> slowly. But for scenarios where the main stream gets 50k+ events per
> seconds and the joined stream ~100 events per second it simply does not
> work. Especially if there is no support for updates in Map side input and
> the side input has to be updated/broadcasted as a whole.
>
> @Jan - very interesting, as I understood the joins are already implemented
> (plenty of them in Scio, classic ones, sparse versions, etc.) the problem
> is with limited windows semantics, triggering policy and the time of
> emitted events.
>
> Please look at LookupCacheDoFn, it looks like left outer join - but it
> isn't. Only the latest Lookup value (right side of the join) is cached. And
> the left side of the join is cached only until the first matching lookup is
> observed. Not so generic but quite efficient.
>
>
> https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala
>
> Marcin
>
> On Fri, 1 May 2020 at 22:22, Jan Lukavský <je...@seznam.cz> wrote:
>
>> Interestingly, I'm currently also working on a proposal for generic join
>> semantics. I plan to send a proposal for review, but unfortunately, there
>> are still other things keeping me busy. I take this opportunity to review
>> high-level thoughts, maybe someone can give some points.
>>
>> The general idea is to define a join that can incorporate all other types
>> as special cases, where the generic implementation can be simplified or
>> optimized, but the semantics remain the same. As I plan to put this down to
>> a full design document I will just very roughly outline ideas:
>>
>>  a) the generic semantics, should be equivalent to running relational
>> join against set of tables _after each individual modification of the
>> relation_ and producing results with timestamp of the last modification
>>
>>  b) windows "scope" state of each "table" - i.e. when time reaches
>> window.maxTimestamp() corresponding "table" is cleared
>>
>>  c) it should be possible to derive other types of joins from this
>> definition by certain manipulations (e.g. buffering multiple updates in
>> single window and assigninig all elements timestamp of
>> window.maxTimestamp() will yield the classical "windowed join" with the
>> requirement to have same windows on both (all) sides as otherwise the
>> result will be empty) - the goal of these modification is typically
>> enabling some optimization (e.g. the fully generic implementation must
>> include time sorting - either implicitly or explicitly, optimized variants
>> can drop this requirement).
>>
>> It would be great is someone has any comments on this bottom-up approach.
>>
>> Jan
>> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>>
>> +dev <de...@beam.apache.org>@beam and some people who I talk about joins
>> with
>>
>> Interesting! It is a lot to take in and fully grok the code, so calling
>> in reinforcements...
>>
>> Generally, I think there's agreement that for a lot of real use cases,
>> you have to roll your own join using the lower level Beam primitives. So I
>> think it would be great to get some of these other approaches to joins into
>> Beam, perhaps as an extension of the Java SDK or even in the core (since
>> schema joins are in the core). In particular:
>>
>>  - "join in fixed window with repeater" sounds similar (but not
>> identical) to work by Mikhail
>>  - "join in global window with cache" sounds similar (but not identical)
>> to work and discussions w/ Reza and Tyson
>>
>> I want to be clear that I am *not* saying there's any duplication. I'm
>> guessing these all fit into a collection of different ways to accomplish
>> joins, and if everything comes to fruition we will have the great
>> opportunity to document how a user should choose between them.
>>
>> Kenn
>>
>> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <ma...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> it's my first post here but I'm a group reader for a while, so thank you
>>> for sharing the knowledge!
>>>
>>> I've been using Beam/Scio on Dataflow for about a year, mostly for
>>> stream processing from unbounded source like PubSub. During my daily work I
>>> found that built-in windowing is very generic and provides reach
>>> watermark/late events semantics but there are a few very annoying
>>> limitations, e.g:
>>> - both side of the join must be defined within compatible windows
>>> - for fixed windows, elements close to window boundaries (but in
>>> different windows) won't be joined
>>> - for sliding windows there is a huge overhead if the duration is much
>>> longer than offset
>>>
>>> I would like to ask you to review a few "join/windowing patterns" with
>>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
>>> crafted for more specific needs. I published code with tests, feel free to
>>> comment as GitHub issues or on the mailing list. The event time processing
>>> with watermarks is so demanding that I'm almost sure that I overlooked many
>>> important corner cases.
>>> https://github.com/mkuthan/beam-examples
>>>
>>> If you think that the examples are somehow useful I'll be glad to write
>>> blog post with more details :)
>>>
>>> Marcin
>>>
>>

Re: Non-trivial joins examples

Posted by Marcin Kuthan <ma...@gmail.com>.
@Kenneth - thank for your response, for sure I was inspired a lot by
earlier discussions on the group and latest documentation updates about
Timers:
https://beam.apache.org/documentation/programming-guide/#timers

In the limitations I forgot to mention about SideInputs, it works quite
well for scenarios where one side of the join is updated slowly, very
slowly. But for scenarios where the main stream gets 50k+ events per
seconds and the joined stream ~100 events per second it simply does not
work. Especially if there is no support for updates in Map side input and
the side input has to be updated/broadcasted as a whole.

@Jan - very interesting, as I understood the joins are already implemented
(plenty of them in Scio, classic ones, sparse versions, etc.) the problem
is with limited windows semantics, triggering policy and the time of
emitted events.

Please look at LookupCacheDoFn, it looks like left outer join - but it
isn't. Only the latest Lookup value (right side of the join) is cached. And
the left side of the join is cached only until the first matching lookup is
observed. Not so generic but quite efficient.

https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala

Marcin

On Fri, 1 May 2020 at 22:22, Jan Lukavský <je...@seznam.cz> wrote:

> Interestingly, I'm currently also working on a proposal for generic join
> semantics. I plan to send a proposal for review, but unfortunately, there
> are still other things keeping me busy. I take this opportunity to review
> high-level thoughts, maybe someone can give some points.
>
> The general idea is to define a join that can incorporate all other types
> as special cases, where the generic implementation can be simplified or
> optimized, but the semantics remain the same. As I plan to put this down to
> a full design document I will just very roughly outline ideas:
>
>  a) the generic semantics, should be equivalent to running relational join
> against set of tables _after each individual modification of the relation_
> and producing results with timestamp of the last modification
>
>  b) windows "scope" state of each "table" - i.e. when time reaches
> window.maxTimestamp() corresponding "table" is cleared
>
>  c) it should be possible to derive other types of joins from this
> definition by certain manipulations (e.g. buffering multiple updates in
> single window and assigninig all elements timestamp of
> window.maxTimestamp() will yield the classical "windowed join" with the
> requirement to have same windows on both (all) sides as otherwise the
> result will be empty) - the goal of these modification is typically
> enabling some optimization (e.g. the fully generic implementation must
> include time sorting - either implicitly or explicitly, optimized variants
> can drop this requirement).
>
> It would be great is someone has any comments on this bottom-up approach.
>
> Jan
> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>
> +dev <de...@beam.apache.org>@beam and some people who I talk about joins
> with
>
> Interesting! It is a lot to take in and fully grok the code, so calling in
> reinforcements...
>
> Generally, I think there's agreement that for a lot of real use cases, you
> have to roll your own join using the lower level Beam primitives. So I
> think it would be great to get some of these other approaches to joins into
> Beam, perhaps as an extension of the Java SDK or even in the core (since
> schema joins are in the core). In particular:
>
>  - "join in fixed window with repeater" sounds similar (but not identical)
> to work by Mikhail
>  - "join in global window with cache" sounds similar (but not identical)
> to work and discussions w/ Reza and Tyson
>
> I want to be clear that I am *not* saying there's any duplication. I'm
> guessing these all fit into a collection of different ways to accomplish
> joins, and if everything comes to fruition we will have the great
> opportunity to document how a user should choose between them.
>
> Kenn
>
> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <ma...@gmail.com>
> wrote:
>
>> Hi,
>>
>> it's my first post here but I'm a group reader for a while, so thank you
>> for sharing the knowledge!
>>
>> I've been using Beam/Scio on Dataflow for about a year, mostly for stream
>> processing from unbounded source like PubSub. During my daily work I found
>> that built-in windowing is very generic and provides reach watermark/late
>> events semantics but there are a few very annoying limitations, e.g:
>> - both side of the join must be defined within compatible windows
>> - for fixed windows, elements close to window boundaries (but in
>> different windows) won't be joined
>> - for sliding windows there is a huge overhead if the duration is much
>> longer than offset
>>
>> I would like to ask you to review a few "join/windowing patterns" with
>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
>> crafted for more specific needs. I published code with tests, feel free to
>> comment as GitHub issues or on the mailing list. The event time processing
>> with watermarks is so demanding that I'm almost sure that I overlooked many
>> important corner cases.
>> https://github.com/mkuthan/beam-examples
>>
>> If you think that the examples are somehow useful I'll be glad to write
>> blog post with more details :)
>>
>> Marcin
>>
>

Re: Non-trivial joins examples

Posted by Marcin Kuthan <ma...@gmail.com>.
@Kenneth - thank for your response, for sure I was inspired a lot by
earlier discussions on the group and latest documentation updates about
Timers:
https://beam.apache.org/documentation/programming-guide/#timers

In the limitations I forgot to mention about SideInputs, it works quite
well for scenarios where one side of the join is updated slowly, very
slowly. But for scenarios where the main stream gets 50k+ events per
seconds and the joined stream ~100 events per second it simply does not
work. Especially if there is no support for updates in Map side input and
the side input has to be updated/broadcasted as a whole.

@Jan - very interesting, as I understood the joins are already implemented
(plenty of them in Scio, classic ones, sparse versions, etc.) the problem
is with limited windows semantics, triggering policy and the time of
emitted events.

Please look at LookupCacheDoFn, it looks like left outer join - but it
isn't. Only the latest Lookup value (right side of the join) is cached. And
the left side of the join is cached only until the first matching lookup is
observed. Not so generic but quite efficient.

https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala

Marcin

On Fri, 1 May 2020 at 22:22, Jan Lukavský <je...@seznam.cz> wrote:

> Interestingly, I'm currently also working on a proposal for generic join
> semantics. I plan to send a proposal for review, but unfortunately, there
> are still other things keeping me busy. I take this opportunity to review
> high-level thoughts, maybe someone can give some points.
>
> The general idea is to define a join that can incorporate all other types
> as special cases, where the generic implementation can be simplified or
> optimized, but the semantics remain the same. As I plan to put this down to
> a full design document I will just very roughly outline ideas:
>
>  a) the generic semantics, should be equivalent to running relational join
> against set of tables _after each individual modification of the relation_
> and producing results with timestamp of the last modification
>
>  b) windows "scope" state of each "table" - i.e. when time reaches
> window.maxTimestamp() corresponding "table" is cleared
>
>  c) it should be possible to derive other types of joins from this
> definition by certain manipulations (e.g. buffering multiple updates in
> single window and assigninig all elements timestamp of
> window.maxTimestamp() will yield the classical "windowed join" with the
> requirement to have same windows on both (all) sides as otherwise the
> result will be empty) - the goal of these modification is typically
> enabling some optimization (e.g. the fully generic implementation must
> include time sorting - either implicitly or explicitly, optimized variants
> can drop this requirement).
>
> It would be great is someone has any comments on this bottom-up approach.
>
> Jan
> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>
> +dev <de...@beam.apache.org>@beam and some people who I talk about joins
> with
>
> Interesting! It is a lot to take in and fully grok the code, so calling in
> reinforcements...
>
> Generally, I think there's agreement that for a lot of real use cases, you
> have to roll your own join using the lower level Beam primitives. So I
> think it would be great to get some of these other approaches to joins into
> Beam, perhaps as an extension of the Java SDK or even in the core (since
> schema joins are in the core). In particular:
>
>  - "join in fixed window with repeater" sounds similar (but not identical)
> to work by Mikhail
>  - "join in global window with cache" sounds similar (but not identical)
> to work and discussions w/ Reza and Tyson
>
> I want to be clear that I am *not* saying there's any duplication. I'm
> guessing these all fit into a collection of different ways to accomplish
> joins, and if everything comes to fruition we will have the great
> opportunity to document how a user should choose between them.
>
> Kenn
>
> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <ma...@gmail.com>
> wrote:
>
>> Hi,
>>
>> it's my first post here but I'm a group reader for a while, so thank you
>> for sharing the knowledge!
>>
>> I've been using Beam/Scio on Dataflow for about a year, mostly for stream
>> processing from unbounded source like PubSub. During my daily work I found
>> that built-in windowing is very generic and provides reach watermark/late
>> events semantics but there are a few very annoying limitations, e.g:
>> - both side of the join must be defined within compatible windows
>> - for fixed windows, elements close to window boundaries (but in
>> different windows) won't be joined
>> - for sliding windows there is a huge overhead if the duration is much
>> longer than offset
>>
>> I would like to ask you to review a few "join/windowing patterns" with
>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
>> crafted for more specific needs. I published code with tests, feel free to
>> comment as GitHub issues or on the mailing list. The event time processing
>> with watermarks is so demanding that I'm almost sure that I overlooked many
>> important corner cases.
>> https://github.com/mkuthan/beam-examples
>>
>> If you think that the examples are somehow useful I'll be glad to write
>> blog post with more details :)
>>
>> Marcin
>>
>

Re: Non-trivial joins examples

Posted by Jan Lukavský <je...@seznam.cz>.
Interestingly, I'm currently also working on a proposal for generic join 
semantics. I plan to send a proposal for review, but unfortunately, 
there are still other things keeping me busy. I take this opportunity to 
review high-level thoughts, maybe someone can give some points.

The general idea is to define a join that can incorporate all other 
types as special cases, where the generic implementation can be 
simplified or optimized, but the semantics remain the same. As I plan to 
put this down to a full design document I will just very roughly outline 
ideas:

  a) the generic semantics, should be equivalent to running relational 
join against set of tables _after each individual modification of the 
relation_ and producing results with timestamp of the last modification

  b) windows "scope" state of each "table" - i.e. when time reaches 
window.maxTimestamp() corresponding "table" is cleared

  c) it should be possible to derive other types of joins from this 
definition by certain manipulations (e.g. buffering multiple updates in 
single window and assigninig all elements timestamp of 
window.maxTimestamp() will yield the classical "windowed join" with the 
requirement to have same windows on both (all) sides as otherwise the 
result will be empty) - the goal of these modification is typically 
enabling some optimization (e.g. the fully generic implementation must 
include time sorting - either implicitly or explicitly, optimized 
variants can drop this requirement).

It would be great is someone has any comments on this bottom-up approach.

Jan

On 5/1/20 5:30 PM, Kenneth Knowles wrote:
> +dev <ma...@beam.apache.org>@beam and some people who I talk 
> about joins with
>
> Interesting! It is a lot to take in and fully grok the code, so 
> calling in reinforcements...
>
> Generally, I think there's agreement that for a lot of real use cases, 
> you have to roll your own join using the lower level Beam primitives. 
> So I think it would be great to get some of these other approaches to 
> joins into Beam, perhaps as an extension of the Java SDK or even in 
> the core (since schema joins are in the core). In particular:
>
>  - "join in fixed window with repeater" sounds similar (but not 
> identical) to work by Mikhail
>  - "join in global window with cache" sounds similar (but not 
> identical) to work and discussions w/ Reza and Tyson
>
> I want to be clear that I am *not* saying there's any duplication. I'm 
> guessing these all fit into a collection of different ways to 
> accomplish joins, and if everything comes to fruition we will have the 
> great opportunity to document how a user should choose between them.
>
> Kenn
>
> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <marcin.kuthan@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi,
>
>     it's my first post here but I'm a group reader for a while, so
>     thank you for sharing the knowledge!
>
>     I've been using Beam/Scio on Dataflow for about a year, mostly for
>     stream processing from unbounded source like PubSub. During my
>     daily work I found that built-in windowing is very generic and
>     provides reach watermark/late events semantics but there are a few
>     very annoying limitations, e.g:
>     - both side of the join must be defined within compatible windows
>     - for fixed windows, elements close to window boundaries (but in
>     different windows) won't be joined
>     - for sliding windows there is a huge overhead if the duration is
>     much longer than offset
>
>     I would like to ask you to review a few "join/windowing patterns"
>     with custom stateful ParDos, not so generic as Beam built-ins but
>     perhaps better crafted for more specific needs. I published code
>     with tests, feel free to comment as GitHub issues or on the
>     mailing list. The event time processing with watermarks is so
>     demanding that I'm almost sure that I overlooked many important
>     corner cases.
>     https://github.com/mkuthan/beam-examples
>
>     If you think that the examples are somehow useful I'll be glad to
>     write blog post with more details :)
>
>     Marcin
>

Re: Non-trivial joins examples

Posted by Jan Lukavský <je...@seznam.cz>.
Interestingly, I'm currently also working on a proposal for generic join 
semantics. I plan to send a proposal for review, but unfortunately, 
there are still other things keeping me busy. I take this opportunity to 
review high-level thoughts, maybe someone can give some points.

The general idea is to define a join that can incorporate all other 
types as special cases, where the generic implementation can be 
simplified or optimized, but the semantics remain the same. As I plan to 
put this down to a full design document I will just very roughly outline 
ideas:

  a) the generic semantics, should be equivalent to running relational 
join against set of tables _after each individual modification of the 
relation_ and producing results with timestamp of the last modification

  b) windows "scope" state of each "table" - i.e. when time reaches 
window.maxTimestamp() corresponding "table" is cleared

  c) it should be possible to derive other types of joins from this 
definition by certain manipulations (e.g. buffering multiple updates in 
single window and assigninig all elements timestamp of 
window.maxTimestamp() will yield the classical "windowed join" with the 
requirement to have same windows on both (all) sides as otherwise the 
result will be empty) - the goal of these modification is typically 
enabling some optimization (e.g. the fully generic implementation must 
include time sorting - either implicitly or explicitly, optimized 
variants can drop this requirement).

It would be great is someone has any comments on this bottom-up approach.

Jan

On 5/1/20 5:30 PM, Kenneth Knowles wrote:
> +dev <ma...@beam.apache.org>@beam and some people who I talk 
> about joins with
>
> Interesting! It is a lot to take in and fully grok the code, so 
> calling in reinforcements...
>
> Generally, I think there's agreement that for a lot of real use cases, 
> you have to roll your own join using the lower level Beam primitives. 
> So I think it would be great to get some of these other approaches to 
> joins into Beam, perhaps as an extension of the Java SDK or even in 
> the core (since schema joins are in the core). In particular:
>
>  - "join in fixed window with repeater" sounds similar (but not 
> identical) to work by Mikhail
>  - "join in global window with cache" sounds similar (but not 
> identical) to work and discussions w/ Reza and Tyson
>
> I want to be clear that I am *not* saying there's any duplication. I'm 
> guessing these all fit into a collection of different ways to 
> accomplish joins, and if everything comes to fruition we will have the 
> great opportunity to document how a user should choose between them.
>
> Kenn
>
> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <marcin.kuthan@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi,
>
>     it's my first post here but I'm a group reader for a while, so
>     thank you for sharing the knowledge!
>
>     I've been using Beam/Scio on Dataflow for about a year, mostly for
>     stream processing from unbounded source like PubSub. During my
>     daily work I found that built-in windowing is very generic and
>     provides reach watermark/late events semantics but there are a few
>     very annoying limitations, e.g:
>     - both side of the join must be defined within compatible windows
>     - for fixed windows, elements close to window boundaries (but in
>     different windows) won't be joined
>     - for sliding windows there is a huge overhead if the duration is
>     much longer than offset
>
>     I would like to ask you to review a few "join/windowing patterns"
>     with custom stateful ParDos, not so generic as Beam built-ins but
>     perhaps better crafted for more specific needs. I published code
>     with tests, feel free to comment as GitHub issues or on the
>     mailing list. The event time processing with watermarks is so
>     demanding that I'm almost sure that I overlooked many important
>     corner cases.
>     https://github.com/mkuthan/beam-examples
>
>     If you think that the examples are somehow useful I'll be glad to
>     write blog post with more details :)
>
>     Marcin
>