You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by rahul patwari <ra...@gmail.com> on 2019/07/22 17:39:05 UTC

Enhancement for Joining Unbounded PCollections of different WindowFns

Hi,

Beam currently doesn't support Join of Unbounded PCollections of different
WindowFns (
https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
).

BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
performing 'SideInputJoin' with Bounded PCollection as a SideInput.

Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
one of the Unbounded PCollection has [GlobalWindows Applied with
Non-Default Trigger(probably a slow-changing lookup cache
https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
by performing 'SideInputJoin'?

Regards,
Rahul

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by Rui Wang <ru...@google.com>.
Thanks Rahul.

I assigned both BEAM-6114 <https://jira.apache.org/jira/browse/BEAM-6114>(split
join multiple rel for different join implementation) and BEAM-7835
<https://jira.apache.org/jira/browse/BEAM-7835> (add the join you want to
support) to you.


-Rui

On Fri, Jul 26, 2019 at 6:04 PM rahul patwari <ra...@gmail.com>
wrote:

> Thanks for your detailed explanation Rui.
>
> Like you said, the triggers for the PCollections should be compatible with
> "Slowly Changing Lookup Cache" pattern.
>
> Rui, If this feature makes sense, can you please create a JIRA for it.
>
> I will start working on splitting BeamJoinRel.java to specific
> implementations with SQL planner rules. I will also implement the "Slowly
> Changing Lookup Cache" pattern with SQL planner rules.
>
> Thanks,
> Rahul
>
> On Sat 27 Jul, 2019, 1:58 AM Rui Wang, <ru...@google.com> wrote:
>
>>
>>
>>> PCollection<Row> mainStream = ...
>>> *PCollectionView<Map<K, Iterable<V>>>* lookupStream = ...      // Note:
>>> PCollectionView not PCollection. I have referred to PCollection before. And *PCollectionView
>>> should be of type Multimap*, to perform SideinputJoin.
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
>>> //PCollectionTuple has to be enhanced to take PCollectionView also as an
>>> argument.
>>> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>>>
>> and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
>>> and a *PCollectionView*(instanceof check), SideInputJoin will be
>>> applied.
>>>
>>
>> Yes, I am thinking something similar to it.
>>
>>
>>
>>> I think that performing SideInputJoin on two unbounded PCollections with
>>> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
>>> Cache Pattern") is relatively straight forward if we take *PCollection*
>>> itself as an argument for LookupTable in PCollectionTuple.
>>>
>> I think it's a hack in BeamJoinRel to check WindowFn and perform
>> SideInput when one side is unbounded non-global windowing, another side is
>> unbounded global windowing (and likely you need also check triggers?). For
>> SQL, if you really want to do it, you should do it by planner rules to
>> match exactly the case you want to support and decouple this join
>> implementation from BeamJoinRel.
>>
>> Even current BeamJoinRel is too large and we should split it to different
>> JoinRel to match different plans.
>>
>>
>>
>>> The conversion of PCollection to PCollectionView is hidden for the user
>>> in this case(Which will be performed internally by SideInputJoin).
>>> Moreover, if the user wants to perform some SQL Aggregations on
>>> "lookupStream" before performing Join with "mainStream"(Multiple SQL
>>> Queries separated by ";"), it is possible in this case, as the
>>> "lookupStream" is a PCollection. But, it is not possible if the
>>> "lookupStream" is a PCollectionView.
>>>
>> It's true that PCollectionView will limit further SQL operations. The
>> workaround is do those operations by java before using SqlTransform, and
>> within SqlTransfrom, start with the Join.
>>
>>
>> So if your use case is support a general SQL operations on two unbounded
>> PCollections but with a special need that to perform a SideInput join for
>> these two unbounded PColleciton with a special WindowFn setting (maybe even
>> trigger) checking, the best way then is to define SQL plan rules and have a
>> separate Rel implementation.
>>
>>
>>
>> -Rui
>>
>>
>>
>>
>>> Regards,
>>> Rahul
>>>
>>> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang <ru...@google.com> wrote:
>>>
>>>> I see.
>>>>
>>>> Actually I was still referring to make "LookupStream" as
>>>> PCollectionView to perform sideinput join, which then doesn't have mismatch
>>>> WindowFn problem. Otherwise, we shouldn't check special case of WindowFn to
>>>> decide if perform a sideinput join for two unbounded PCollection when their
>>>> WindowFn does not match.
>>>>
>>>> And "data completeness" really means is sideinput is triggered so it
>>>> could change, and then the question is when sideinput is changed, should we
>>>> refine previous data? It becomes harder to reason at this moment.
>>>>
>>>>
>>>> Rui
>>>>
>>>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <
>>>> rahulpatwari8383@gmail.com> wrote:
>>>>
>>>>> "*In terms of Join schematic, I think it's hard to reason data
>>>>> completeness since one side of the join is changing*"
>>>>> - As it is possible to apply [Global Windows with Non-Default Trigger]
>>>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>>>>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>>>>> the condition that one of the PCollection being Joined have WindowFn as
>>>>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>>>>> pastFirstElementInPane())] is it sufficient to perform the Join of
>>>>> "MainStream" and this "LookupStream"?
>>>>>
>>>>> In other words, I mean to say that instead of directly throwing
>>>>> Exception
>>>>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359> when
>>>>> Joining two Unbounded PCollections with different WindowFns, If we can
>>>>> ensure that
>>>>> MainStream: one side of the join is Unbounded with WindowFn as
>>>>> [Non-Global Windows with DefaultTrigger] and
>>>>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>>>>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>>>>> pastFirstElementInPane()) Trigger],
>>>>> we can directly perform a SideInputJoin.
>>>>>
>>>>> Will we have "data completeness" problem even in "Slowly Changing
>>>>> lookup Cache Pattern"?
>>>>>
>>>>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ru...@google.com> wrote:
>>>>>
>>>>>> To be more clear, I think it's useful if we can achieve the following
>>>>>> that you wrote
>>>>>>
>>>>>> PCollection mainStream = ...;
>>>>>> PCollection lookupStream = ...;
>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:
>>>>>>
>>>>>>> Hi Rahul, thanks for your detailed writeup. It pretty much
>>>>>>> summarizes the slow changing table join problem.
>>>>>>>
>>>>>>> To your question: "Can we implement SideInputJoin for this case",
>>>>>>> there are two perspectives.
>>>>>>>
>>>>>>> In terms of implementing the slowing changing lookup cache pattern
>>>>>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
>>>>>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>>>>>> exploring it until we identify blockers. I also think this pattern is
>>>>>>> already useful to users.
>>>>>>>
>>>>>>> In terms of Join schematic, I think it's hard to reason data
>>>>>>> completeness since one side of join is changing.
>>>>>>>
>>>>>>> -Rui
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kenn,
>>>>>>>>
>>>>>>>> If we consider the following two *Unbounded* PCollections:
>>>>>>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>>>>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>>>>>>> coincidentally turned out to be the opposite
>>>>>>>>
>>>>>>>> Joining these two PCollections in BeamSql currently is not possible
>>>>>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>>>>>>> Mismatch)
>>>>>>>> But in this case, PCollection1 can be joined with PCollection2
>>>>>>>> using SideInputJoin (
>>>>>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>>>>>>> which is being done for Joining an Unbounded PCollection with Bounded
>>>>>>>> PCollection. I am thinking that Beam can guarantee it joins all
>>>>>>>> input elements once per window for this case.
>>>>>>>> The result of the join might be fuzzy for the window when the
>>>>>>>> Trigger for PCollection2 fires and sideinput gets loaded into Memory.
>>>>>>>>
>>>>>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>>>>>>> BeamSql can support Pattern:
>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>>>>>>> which is currently not possible.
>>>>>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>>>>>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>>>>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>>>>>>> TableProvider.
>>>>>>>>
>>>>>>>> If we can support this, User will be able to do:
>>>>>>>> PCollection mainStream = ...;
>>>>>>>> PCollection lookupStream = ...;
>>>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>>>>
>>>>>>>> Can we implement SideInputJoin for this case?
>>>>>>>> I might be wrong in my understanding. Please let me know your
>>>>>>>> thoughts.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Rahul
>>>>>>>>
>>>>>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the best way to approach this is probably to have an
>>>>>>>>> example SQL statement and to discuss what the relational semantics should
>>>>>>>>> be.
>>>>>>>>>
>>>>>>>>> Windowing is not really part of SQL (yet) and in a way it just
>>>>>>>>> needs very minimal extensions. See
>>>>>>>>> https://arxiv.org/abs/1905.12133. In this proposal for SQL,
>>>>>>>>> windowed aggregation is explicitly be part of the GROUP BY operation, where
>>>>>>>>> you GROUP BY window columns that were added. So it is more explicit than in
>>>>>>>>> Beam. Relations do not have a WindowFn so there is no problem of them being
>>>>>>>>> incompatible.
>>>>>>>>>
>>>>>>>>> With Beam SQL there are basically two ways of windowing that work
>>>>>>>>> totally differently:
>>>>>>>>>
>>>>>>>>> 1. SQL style windowing where you GROUP BY windows. This does not
>>>>>>>>> use the input PCollection windowfn
>>>>>>>>> 2. PCollection windowing where the SQL does not do any windowing -
>>>>>>>>> this should apply the SQL expression to each window independently
>>>>>>>>>
>>>>>>>>> In order to support a hybrid of these, it might be:
>>>>>>>>>
>>>>>>>>> 3. SQL style windowing, where when a PCollection has window
>>>>>>>>> assigned, the window columns are added before the SQL is applied. It is a
>>>>>>>>> bit strange but might enable your use.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>>>>>>> different WindowFns (
>>>>>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>>>>>>> ).
>>>>>>>>>>
>>>>>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded
>>>>>>>>>> PCollection], by performing 'SideInputJoin' with Bounded PCollection as a
>>>>>>>>>> SideInput.
>>>>>>>>>>
>>>>>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded
>>>>>>>>>> PCollection], when one of the Unbounded PCollection has [GlobalWindows
>>>>>>>>>> Applied with Non-Default Trigger(probably a slow-changing lookup cache
>>>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>>>>>>> by performing 'SideInputJoin'?
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Rahul
>>>>>>>>>>
>>>>>>>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by rahul patwari <ra...@gmail.com>.
Thanks for your detailed explanation Rui.

Like you said, the triggers for the PCollections should be compatible with
"Slowly Changing Lookup Cache" pattern.

Rui, If this feature makes sense, can you please create a JIRA for it.

I will start working on splitting BeamJoinRel.java to specific
implementations with SQL planner rules. I will also implement the "Slowly
Changing Lookup Cache" pattern with SQL planner rules.

Thanks,
Rahul

On Sat 27 Jul, 2019, 1:58 AM Rui Wang, <ru...@google.com> wrote:

>
>
>> PCollection<Row> mainStream = ...
>> *PCollectionView<Map<K, Iterable<V>>>* lookupStream = ...      // Note:
>> PCollectionView not PCollection. I have referred to PCollection before. And *PCollectionView
>> should be of type Multimap*, to perform SideinputJoin.
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
>> //PCollectionTuple has to be enhanced to take PCollectionView also as an
>> argument.
>> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>>
> and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
>> and a *PCollectionView*(instanceof check), SideInputJoin will be applied.
>>
>
> Yes, I am thinking something similar to it.
>
>
>
>> I think that performing SideInputJoin on two unbounded PCollections with
>> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
>> Cache Pattern") is relatively straight forward if we take *PCollection*
>> itself as an argument for LookupTable in PCollectionTuple.
>>
> I think it's a hack in BeamJoinRel to check WindowFn and perform
> SideInput when one side is unbounded non-global windowing, another side is
> unbounded global windowing (and likely you need also check triggers?). For
> SQL, if you really want to do it, you should do it by planner rules to
> match exactly the case you want to support and decouple this join
> implementation from BeamJoinRel.
>
> Even current BeamJoinRel is too large and we should split it to different
> JoinRel to match different plans.
>
>
>
>> The conversion of PCollection to PCollectionView is hidden for the user
>> in this case(Which will be performed internally by SideInputJoin).
>> Moreover, if the user wants to perform some SQL Aggregations on
>> "lookupStream" before performing Join with "mainStream"(Multiple SQL
>> Queries separated by ";"), it is possible in this case, as the
>> "lookupStream" is a PCollection. But, it is not possible if the
>> "lookupStream" is a PCollectionView.
>>
> It's true that PCollectionView will limit further SQL operations. The
> workaround is do those operations by java before using SqlTransform, and
> within SqlTransfrom, start with the Join.
>
>
> So if your use case is support a general SQL operations on two unbounded
> PCollections but with a special need that to perform a SideInput join for
> these two unbounded PColleciton with a special WindowFn setting (maybe even
> trigger) checking, the best way then is to define SQL plan rules and have a
> separate Rel implementation.
>
>
>
> -Rui
>
>
>
>
>> Regards,
>> Rahul
>>
>> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang <ru...@google.com> wrote:
>>
>>> I see.
>>>
>>> Actually I was still referring to make "LookupStream" as
>>> PCollectionView to perform sideinput join, which then doesn't have mismatch
>>> WindowFn problem. Otherwise, we shouldn't check special case of WindowFn to
>>> decide if perform a sideinput join for two unbounded PCollection when their
>>> WindowFn does not match.
>>>
>>> And "data completeness" really means is sideinput is triggered so it
>>> could change, and then the question is when sideinput is changed, should we
>>> refine previous data? It becomes harder to reason at this moment.
>>>
>>>
>>> Rui
>>>
>>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <
>>> rahulpatwari8383@gmail.com> wrote:
>>>
>>>> "*In terms of Join schematic, I think it's hard to reason data
>>>> completeness since one side of the join is changing*"
>>>> - As it is possible to apply [Global Windows with Non-Default Trigger]
>>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>>>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>>>> the condition that one of the PCollection being Joined have WindowFn as
>>>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>>>> pastFirstElementInPane())] is it sufficient to perform the Join of
>>>> "MainStream" and this "LookupStream"?
>>>>
>>>> In other words, I mean to say that instead of directly throwing
>>>> Exception
>>>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359> when
>>>> Joining two Unbounded PCollections with different WindowFns, If we can
>>>> ensure that
>>>> MainStream: one side of the join is Unbounded with WindowFn as
>>>> [Non-Global Windows with DefaultTrigger] and
>>>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>>>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>>>> pastFirstElementInPane()) Trigger],
>>>> we can directly perform a SideInputJoin.
>>>>
>>>> Will we have "data completeness" problem even in "Slowly Changing
>>>> lookup Cache Pattern"?
>>>>
>>>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ru...@google.com> wrote:
>>>>
>>>>> To be more clear, I think it's useful if we can achieve the following
>>>>> that you wrote
>>>>>
>>>>> PCollection mainStream = ...;
>>>>> PCollection lookupStream = ...;
>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:
>>>>>
>>>>>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
>>>>>> the slow changing table join problem.
>>>>>>
>>>>>> To your question: "Can we implement SideInputJoin for this case",
>>>>>> there are two perspectives.
>>>>>>
>>>>>> In terms of implementing the slowing changing lookup cache pattern
>>>>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
>>>>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>>>>> exploring it until we identify blockers. I also think this pattern is
>>>>>> already useful to users.
>>>>>>
>>>>>> In terms of Join schematic, I think it's hard to reason data
>>>>>> completeness since one side of join is changing.
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kenn,
>>>>>>>
>>>>>>> If we consider the following two *Unbounded* PCollections:
>>>>>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>>>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>>>>>> coincidentally turned out to be the opposite
>>>>>>>
>>>>>>> Joining these two PCollections in BeamSql currently is not possible
>>>>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>>>>>> Mismatch)
>>>>>>> But in this case, PCollection1 can be joined with PCollection2 using
>>>>>>> SideInputJoin (
>>>>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>>>>>> which is being done for Joining an Unbounded PCollection with Bounded
>>>>>>> PCollection. I am thinking that Beam can guarantee it joins all
>>>>>>> input elements once per window for this case.
>>>>>>> The result of the join might be fuzzy for the window when the
>>>>>>> Trigger for PCollection2 fires and sideinput gets loaded into Memory.
>>>>>>>
>>>>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>>>>>> BeamSql can support Pattern:
>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>>>>>> which is currently not possible.
>>>>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>>>>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>>>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>>>>>> TableProvider.
>>>>>>>
>>>>>>> If we can support this, User will be able to do:
>>>>>>> PCollection mainStream = ...;
>>>>>>> PCollection lookupStream = ...;
>>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>>>
>>>>>>> Can we implement SideInputJoin for this case?
>>>>>>> I might be wrong in my understanding. Please let me know your
>>>>>>> thoughts.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Rahul
>>>>>>>
>>>>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think the best way to approach this is probably to have an
>>>>>>>> example SQL statement and to discuss what the relational semantics should
>>>>>>>> be.
>>>>>>>>
>>>>>>>> Windowing is not really part of SQL (yet) and in a way it just
>>>>>>>> needs very minimal extensions. See https://arxiv.org/abs/1905.12133.
>>>>>>>> In this proposal for SQL, windowed aggregation is explicitly be part of the
>>>>>>>> GROUP BY operation, where you GROUP BY window columns that were added. So
>>>>>>>> it is more explicit than in Beam. Relations do not have a WindowFn so there
>>>>>>>> is no problem of them being incompatible.
>>>>>>>>
>>>>>>>> With Beam SQL there are basically two ways of windowing that work
>>>>>>>> totally differently:
>>>>>>>>
>>>>>>>> 1. SQL style windowing where you GROUP BY windows. This does not
>>>>>>>> use the input PCollection windowfn
>>>>>>>> 2. PCollection windowing where the SQL does not do any windowing -
>>>>>>>> this should apply the SQL expression to each window independently
>>>>>>>>
>>>>>>>> In order to support a hybrid of these, it might be:
>>>>>>>>
>>>>>>>> 3. SQL style windowing, where when a PCollection has window
>>>>>>>> assigned, the window columns are added before the SQL is applied. It is a
>>>>>>>> bit strange but might enable your use.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>>>>>> different WindowFns (
>>>>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>>>>>> ).
>>>>>>>>>
>>>>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded
>>>>>>>>> PCollection], by performing 'SideInputJoin' with Bounded PCollection as a
>>>>>>>>> SideInput.
>>>>>>>>>
>>>>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded
>>>>>>>>> PCollection], when one of the Unbounded PCollection has [GlobalWindows
>>>>>>>>> Applied with Non-Default Trigger(probably a slow-changing lookup cache
>>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>>>>>> by performing 'SideInputJoin'?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Rahul
>>>>>>>>>
>>>>>>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by Rui Wang <ru...@google.com>.
>
> PCollection<Row> mainStream = ...
> *PCollectionView<Map<K, Iterable<V>>>* lookupStream = ...      // Note:
> PCollectionView not PCollection. I have referred to PCollection before. And *PCollectionView
> should be of type Multimap*, to perform SideinputJoin.
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
> //PCollectionTuple has to be enhanced to take PCollectionView also as an
> argument.
> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>
and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
> and a *PCollectionView*(instanceof check), SideInputJoin will be applied.
>

Yes, I am thinking something similar to it.



> I think that performing SideInputJoin on two unbounded PCollections with
> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
> Cache Pattern") is relatively straight forward if we take *PCollection*
> itself as an argument for LookupTable in PCollectionTuple.
>
I think it's a hack in BeamJoinRel to check WindowFn and perform
SideInput when one side is unbounded non-global windowing, another side is
unbounded global windowing (and likely you need also check triggers?). For
SQL, if you really want to do it, you should do it by planner rules to
match exactly the case you want to support and decouple this join
implementation from BeamJoinRel.

Even current BeamJoinRel is too large and we should split it to different
JoinRel to match different plans.



> The conversion of PCollection to PCollectionView is hidden for the user in
> this case(Which will be performed internally by SideInputJoin). Moreover,
> if the user wants to perform some SQL Aggregations on "lookupStream" before
> performing Join with "mainStream"(Multiple SQL Queries separated by ";"),
> it is possible in this case, as the "lookupStream" is a PCollection. But,
> it is not possible if the "lookupStream" is a PCollectionView.
>
It's true that PCollectionView will limit further SQL operations. The
workaround is do those operations by java before using SqlTransform, and
within SqlTransfrom, start with the Join.


So if your use case is support a general SQL operations on two unbounded
PCollections but with a special need that to perform a SideInput join for
these two unbounded PColleciton with a special WindowFn setting (maybe even
trigger) checking, the best way then is to define SQL plan rules and have a
separate Rel implementation.



-Rui




> Regards,
> Rahul
>
> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang <ru...@google.com> wrote:
>
>> I see.
>>
>> Actually I was still referring to make "LookupStream" as PCollectionView
>> to perform sideinput join, which then doesn't have mismatch WindowFn
>> problem. Otherwise, we shouldn't check special case of WindowFn to decide
>> if perform a sideinput join for two unbounded PCollection when their
>> WindowFn does not match.
>>
>> And "data completeness" really means is sideinput is triggered so it
>> could change, and then the question is when sideinput is changed, should we
>> refine previous data? It becomes harder to reason at this moment.
>>
>>
>> Rui
>>
>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> "*In terms of Join schematic, I think it's hard to reason data
>>> completeness since one side of the join is changing*"
>>> - As it is possible to apply [Global Windows with Non-Default Trigger]
>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>>> the condition that one of the PCollection being Joined have WindowFn as
>>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>>> pastFirstElementInPane())] is it sufficient to perform the Join of
>>> "MainStream" and this "LookupStream"?
>>>
>>> In other words, I mean to say that instead of directly throwing
>>> Exception
>>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359> when
>>> Joining two Unbounded PCollections with different WindowFns, If we can
>>> ensure that
>>> MainStream: one side of the join is Unbounded with WindowFn as
>>> [Non-Global Windows with DefaultTrigger] and
>>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>>> pastFirstElementInPane()) Trigger],
>>> we can directly perform a SideInputJoin.
>>>
>>> Will we have "data completeness" problem even in "Slowly Changing lookup
>>> Cache Pattern"?
>>>
>>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ru...@google.com> wrote:
>>>
>>>> To be more clear, I think it's useful if we can achieve the following
>>>> that you wrote
>>>>
>>>> PCollection mainStream = ...;
>>>> PCollection lookupStream = ...;
>>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>>> new TupleTag("LookupTable"));
>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>
>>>> -Rui
>>>>
>>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:
>>>>
>>>>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
>>>>> the slow changing table join problem.
>>>>>
>>>>> To your question: "Can we implement SideInputJoin for this case",
>>>>> there are two perspectives.
>>>>>
>>>>> In terms of implementing the slowing changing lookup cache pattern
>>>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
>>>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>>>> exploring it until we identify blockers. I also think this pattern is
>>>>> already useful to users.
>>>>>
>>>>> In terms of Join schematic, I think it's hard to reason data
>>>>> completeness since one side of join is changing.
>>>>>
>>>>> -Rui
>>>>>
>>>>>
>>>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>
>>>>>> Hi Kenn,
>>>>>>
>>>>>> If we consider the following two *Unbounded* PCollections:
>>>>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>>>>> coincidentally turned out to be the opposite
>>>>>>
>>>>>> Joining these two PCollections in BeamSql currently is not possible
>>>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>>>>> Mismatch)
>>>>>> But in this case, PCollection1 can be joined with PCollection2 using
>>>>>> SideInputJoin (
>>>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>>>>> which is being done for Joining an Unbounded PCollection with Bounded
>>>>>> PCollection. I am thinking that Beam can guarantee it joins all
>>>>>> input elements once per window for this case.
>>>>>> The result of the join might be fuzzy for the window when the Trigger
>>>>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>>>>
>>>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>>>>> BeamSql can support Pattern:
>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>>>>> which is currently not possible.
>>>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>>>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>>>>> TableProvider.
>>>>>>
>>>>>> If we can support this, User will be able to do:
>>>>>> PCollection mainStream = ...;
>>>>>> PCollection lookupStream = ...;
>>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>>
>>>>>> Can we implement SideInputJoin for this case?
>>>>>> I might be wrong in my understanding. Please let me know your
>>>>>> thoughts.
>>>>>>
>>>>>> Thanks,
>>>>>> Rahul
>>>>>>
>>>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I think the best way to approach this is probably to have an example
>>>>>>> SQL statement and to discuss what the relational semantics should be.
>>>>>>>
>>>>>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>>>>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In
>>>>>>> this proposal for SQL, windowed aggregation is explicitly be part of the
>>>>>>> GROUP BY operation, where you GROUP BY window columns that were added. So
>>>>>>> it is more explicit than in Beam. Relations do not have a WindowFn so there
>>>>>>> is no problem of them being incompatible.
>>>>>>>
>>>>>>> With Beam SQL there are basically two ways of windowing that work
>>>>>>> totally differently:
>>>>>>>
>>>>>>> 1. SQL style windowing where you GROUP BY windows. This does not use
>>>>>>> the input PCollection windowfn
>>>>>>> 2. PCollection windowing where the SQL does not do any windowing -
>>>>>>> this should apply the SQL expression to each window independently
>>>>>>>
>>>>>>> In order to support a hybrid of these, it might be:
>>>>>>>
>>>>>>> 3. SQL style windowing, where when a PCollection has window
>>>>>>> assigned, the window columns are added before the SQL is applied. It is a
>>>>>>> bit strange but might enable your use.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>>>>> different WindowFns (
>>>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>>>>> ).
>>>>>>>>
>>>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded
>>>>>>>> PCollection], by performing 'SideInputJoin' with Bounded PCollection as a
>>>>>>>> SideInput.
>>>>>>>>
>>>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded
>>>>>>>> PCollection], when one of the Unbounded PCollection has [GlobalWindows
>>>>>>>> Applied with Non-Default Trigger(probably a slow-changing lookup cache
>>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>>>>> by performing 'SideInputJoin'?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Rahul
>>>>>>>>
>>>>>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by rahul patwari <ra...@gmail.com>.
Is this the flow that you are referring to:

PCollection<Row> mainStream = ...
*PCollectionView<Map<K, Iterable<V>>>* lookupStream = ...      // Note:
PCollectionView not PCollection. I have referred to PCollection
before. And *PCollectionView
should be of type Multimap*, to perform SideinputJoin.
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
mainStream)).and(new TupleTag("LookupTable"), lookupStream);
//PCollectionTuple has to be enhanced to take PCollectionView also as an
argument.
tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));

and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
and a *PCollectionView*(instanceof check), SideInputJoin will be applied.

I think that performing SideInputJoin on two unbounded PCollections with
different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
Cache Pattern") is relatively straight forward if we take *PCollection*
itself as an argument for LookupTable in PCollectionTuple. The conversion
of PCollection to PCollectionView is hidden for the user in this case(Which
will be performed internally by SideInputJoin). Moreover, if the user wants
to perform some SQL Aggregations on "lookupStream" before performing Join
with "mainStream"(Multiple SQL Queries separated by ";"), it is possible in
this case, as the "lookupStream" is a PCollection. But, it is not possible
if the "lookupStream" is a PCollectionView.

Regards,
Rahul

On Fri, Jul 26, 2019 at 9:19 AM Rui Wang <ru...@google.com> wrote:

> I see.
>
> Actually I was still referring to make "LookupStream" as PCollectionView
> to perform sideinput join, which then doesn't have mismatch WindowFn
> problem. Otherwise, we shouldn't check special case of WindowFn to decide
> if perform a sideinput join for two unbounded PCollection when their
> WindowFn does not match.
>
> And "data completeness" really means is sideinput is triggered so it could
> change, and then the question is when sideinput is changed, should we
> refine previous data? It becomes harder to reason at this moment.
>
>
> Rui
>
> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <ra...@gmail.com>
> wrote:
>
>> "*In terms of Join schematic, I think it's hard to reason data
>> completeness since one side of the join is changing*"
>> - As it is possible to apply [Global Windows with Non-Default Trigger] to
>> Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>> the condition that one of the PCollection being Joined have WindowFn as
>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>> pastFirstElementInPane())] is it sufficient to perform the Join of
>> "MainStream" and this "LookupStream"?
>>
>> In other words, I mean to say that instead of directly throwing Exception
>> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359> when
>> Joining two Unbounded PCollections with different WindowFns, If we can
>> ensure that
>> MainStream: one side of the join is Unbounded with WindowFn as
>> [Non-Global Windows with DefaultTrigger] and
>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>> pastFirstElementInPane()) Trigger],
>> we can directly perform a SideInputJoin.
>>
>> Will we have "data completeness" problem even in "Slowly Changing lookup
>> Cache Pattern"?
>>
>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ru...@google.com> wrote:
>>
>>> To be more clear, I think it's useful if we can achieve the following
>>> that you wrote
>>>
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable"));
>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>
>>> -Rui
>>>
>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:
>>>
>>>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
>>>> the slow changing table join problem.
>>>>
>>>> To your question: "Can we implement SideInputJoin for this case",
>>>> there are two perspectives.
>>>>
>>>> In terms of implementing the slowing changing lookup cache pattern
>>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
>>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>>> exploring it until we identify blockers. I also think this pattern is
>>>> already useful to users.
>>>>
>>>> In terms of Join schematic, I think it's hard to reason data
>>>> completeness since one side of join is changing.
>>>>
>>>> -Rui
>>>>
>>>>
>>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>>> rahulpatwari8383@gmail.com> wrote:
>>>>
>>>>> Hi Kenn,
>>>>>
>>>>> If we consider the following two *Unbounded* PCollections:
>>>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>>>> coincidentally turned out to be the opposite
>>>>>
>>>>> Joining these two PCollections in BeamSql currently is not possible
>>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>>>> Mismatch)
>>>>> But in this case, PCollection1 can be joined with PCollection2 using
>>>>> SideInputJoin (
>>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>>>> which is being done for Joining an Unbounded PCollection with Bounded
>>>>> PCollection. I am thinking that Beam can guarantee it joins all input
>>>>> elements once per window for this case.
>>>>> The result of the join might be fuzzy for the window when the Trigger
>>>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>>>
>>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>>>> BeamSql can support Pattern:
>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>>>> which is currently not possible.
>>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>>>> TableProvider.
>>>>>
>>>>> If we can support this, User will be able to do:
>>>>> PCollection mainStream = ...;
>>>>> PCollection lookupStream = ...;
>>>>> PCollectionTuple tuple = PCollectionTuple.of(new
>>>>> TupleTag("MainTable"), new TupleTag("LookupTable"));
>>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>>
>>>>> Can we implement SideInputJoin for this case?
>>>>> I might be wrong in my understanding. Please let me know your
>>>>> thoughts.
>>>>>
>>>>> Thanks,
>>>>> Rahul
>>>>>
>>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I think the best way to approach this is probably to have an example
>>>>>> SQL statement and to discuss what the relational semantics should be.
>>>>>>
>>>>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>>>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In
>>>>>> this proposal for SQL, windowed aggregation is explicitly be part of the
>>>>>> GROUP BY operation, where you GROUP BY window columns that were added. So
>>>>>> it is more explicit than in Beam. Relations do not have a WindowFn so there
>>>>>> is no problem of them being incompatible.
>>>>>>
>>>>>> With Beam SQL there are basically two ways of windowing that work
>>>>>> totally differently:
>>>>>>
>>>>>> 1. SQL style windowing where you GROUP BY windows. This does not use
>>>>>> the input PCollection windowfn
>>>>>> 2. PCollection windowing where the SQL does not do any windowing -
>>>>>> this should apply the SQL expression to each window independently
>>>>>>
>>>>>> In order to support a hybrid of these, it might be:
>>>>>>
>>>>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>>>>> the window columns are added before the SQL is applied. It is a bit strange
>>>>>> but might enable your use.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>>>> different WindowFns (
>>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>>>> ).
>>>>>>>
>>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection],
>>>>>>> by performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>>>>>
>>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>>>> by performing 'SideInputJoin'?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Rahul
>>>>>>>
>>>>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by Rui Wang <ru...@google.com>.
I see.

Actually I was still referring to make "LookupStream" as PCollectionView to
perform sideinput join, which then doesn't have mismatch WindowFn problem.
Otherwise, we shouldn't check special case of WindowFn to decide if perform
a sideinput join for two unbounded PCollection when their WindowFn does not
match.

And "data completeness" really means is sideinput is triggered so it could
change, and then the question is when sideinput is changed, should we
refine previous data? It becomes harder to reason at this moment.


Rui

On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <ra...@gmail.com>
wrote:

> "*In terms of Join schematic, I think it's hard to reason data
> completeness since one side of the join is changing*"
> - As it is possible to apply [Global Windows with Non-Default Trigger] to
> Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
> the condition that one of the PCollection being Joined have WindowFn as
> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane())] is it sufficient to perform the Join of
> "MainStream" and this "LookupStream"?
>
> In other words, I mean to say that instead of directly throwing Exception
> <https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359> when
> Joining two Unbounded PCollections with different WindowFns, If we can
> ensure that
> MainStream: one side of the join is Unbounded with WindowFn as [Non-Global
> Windows with DefaultTrigger] and
> LookupStream: the other side of the Join is a "Slowly Changing Lookup
> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
> pastFirstElementInPane()) Trigger],
> we can directly perform a SideInputJoin.
>
> Will we have "data completeness" problem even in "Slowly Changing lookup
> Cache Pattern"?
>
> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ru...@google.com> wrote:
>
>> To be more clear, I think it's useful if we can achieve the following
>> that you wrote
>>
>> PCollection mainStream = ...;
>> PCollection lookupStream = ...;
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> new TupleTag("LookupTable"));
>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>
>> -Rui
>>
>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:
>>
>>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
>>> the slow changing table join problem.
>>>
>>> To your question: "Can we implement SideInputJoin for this case", there
>>> are two perspectives.
>>>
>>> In terms of implementing the slowing changing lookup cache pattern
>>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
>>> BeamSQL, such sidinput join can be done that way. At least it worth
>>> exploring it until we identify blockers. I also think this pattern is
>>> already useful to users.
>>>
>>> In terms of Join schematic, I think it's hard to reason data
>>> completeness since one side of join is changing.
>>>
>>> -Rui
>>>
>>>
>>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>>> rahulpatwari8383@gmail.com> wrote:
>>>
>>>> Hi Kenn,
>>>>
>>>> If we consider the following two *Unbounded* PCollections:
>>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>>> coincidentally turned out to be the opposite
>>>>
>>>> Joining these two PCollections in BeamSql currently is not possible
>>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>>> Mismatch)
>>>> But in this case, PCollection1 can be joined with PCollection2 using
>>>> SideInputJoin (
>>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>>> which is being done for Joining an Unbounded PCollection with Bounded
>>>> PCollection. I am thinking that Beam can guarantee it joins all input
>>>> elements once per window for this case.
>>>> The result of the join might be fuzzy for the window when the Trigger
>>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>>
>>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>>> BeamSql can support Pattern:
>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>>> which is currently not possible.
>>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>>> TableProvider.
>>>>
>>>> If we can support this, User will be able to do:
>>>> PCollection mainStream = ...;
>>>> PCollection lookupStream = ...;
>>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>>> new TupleTag("LookupTable"));
>>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>>
>>>> Can we implement SideInputJoin for this case?
>>>> I might be wrong in my understanding. Please let me know your thoughts.
>>>>
>>>> Thanks,
>>>> Rahul
>>>>
>>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> I think the best way to approach this is probably to have an example
>>>>> SQL statement and to discuss what the relational semantics should be.
>>>>>
>>>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In
>>>>> this proposal for SQL, windowed aggregation is explicitly be part of the
>>>>> GROUP BY operation, where you GROUP BY window columns that were added. So
>>>>> it is more explicit than in Beam. Relations do not have a WindowFn so there
>>>>> is no problem of them being incompatible.
>>>>>
>>>>> With Beam SQL there are basically two ways of windowing that work
>>>>> totally differently:
>>>>>
>>>>> 1. SQL style windowing where you GROUP BY windows. This does not use
>>>>> the input PCollection windowfn
>>>>> 2. PCollection windowing where the SQL does not do any windowing -
>>>>> this should apply the SQL expression to each window independently
>>>>>
>>>>> In order to support a hybrid of these, it might be:
>>>>>
>>>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>>>> the window columns are added before the SQL is applied. It is a bit strange
>>>>> but might enable your use.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>>> rahulpatwari8383@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>>> different WindowFns (
>>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>>> ).
>>>>>>
>>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection],
>>>>>> by performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>>>>
>>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>>> by performing 'SideInputJoin'?
>>>>>>
>>>>>> Regards,
>>>>>> Rahul
>>>>>>
>>>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by rahul patwari <ra...@gmail.com>.
"*In terms of Join schematic, I think it's hard to reason data completeness
since one side of the join is changing*"
- As it is possible to apply [Global Windows with Non-Default Trigger] to
Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
the condition that one of the PCollection being Joined have WindowFn as
[Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane())] is it sufficient to perform the Join of
"MainStream" and this "LookupStream"?

In other words, I mean to say that instead of directly throwing Exception
<https://github.com/apache/beam/blob/f03b6ba12e7c0a1005504612cc6067eebec9ffe8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L359>
when
Joining two Unbounded PCollections with different WindowFns, If we can
ensure that
MainStream: one side of the join is Unbounded with WindowFn as [Non-Global
Windows with DefaultTrigger] and
LookupStream: the other side of the Join is a "Slowly Changing Lookup
Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
pastFirstElementInPane()) Trigger],
we can directly perform a SideInputJoin.

Will we have "data completeness" problem even in "Slowly Changing lookup
Cache Pattern"?

On Fri, Jul 26, 2019 at 2:51 AM Rui Wang <ru...@google.com> wrote:

> To be more clear, I think it's useful if we can achieve the following that
> you wrote
>
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> -Rui
>
> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:
>
>> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
>> slow changing table join problem.
>>
>> To your question: "Can we implement SideInputJoin for this case", there
>> are two perspectives.
>>
>> In terms of implementing the slowing changing lookup cache pattern
>> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
>> BeamSQL, such sidinput join can be done that way. At least it worth
>> exploring it until we identify blockers. I also think this pattern is
>> already useful to users.
>>
>> In terms of Join schematic, I think it's hard to reason data completeness
>> since one side of join is changing.
>>
>> -Rui
>>
>>
>> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
>> rahulpatwari8383@gmail.com> wrote:
>>
>>> Hi Kenn,
>>>
>>> If we consider the following two *Unbounded* PCollections:
>>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>>> coincidentally turned out to be the opposite
>>>
>>> Joining these two PCollections in BeamSql currently is not possible
>>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>>> Mismatch)
>>> But in this case, PCollection1 can be joined with PCollection2 using
>>> SideInputJoin (
>>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>>> which is being done for Joining an Unbounded PCollection with Bounded
>>> PCollection. I am thinking that Beam can guarantee it joins all input
>>> elements once per window for this case.
>>> The result of the join might be fuzzy for the window when the Trigger
>>> for PCollection2 fires and sideinput gets loaded into Memory.
>>>
>>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>>> BeamSql can support Pattern:
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>>> which is currently not possible.
>>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>>> BeamSql to natively support PCollectionView so that BeamSql supports
>>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>>> TableProvider.
>>>
>>> If we can support this, User will be able to do:
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable"));
>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>
>>> Can we implement SideInputJoin for this case?
>>> I might be wrong in my understanding. Please let me know your thoughts.
>>>
>>> Thanks,
>>> Rahul
>>>
>>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> I think the best way to approach this is probably to have an example
>>>> SQL statement and to discuss what the relational semantics should be.
>>>>
>>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>>>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>>>> BY operation, where you GROUP BY window columns that were added. So it is
>>>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>>>> problem of them being incompatible.
>>>>
>>>> With Beam SQL there are basically two ways of windowing that work
>>>> totally differently:
>>>>
>>>> 1. SQL style windowing where you GROUP BY windows. This does not use
>>>> the input PCollection windowfn
>>>> 2. PCollection windowing where the SQL does not do any windowing - this
>>>> should apply the SQL expression to each window independently
>>>>
>>>> In order to support a hybrid of these, it might be:
>>>>
>>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>>> the window columns are added before the SQL is applied. It is a bit strange
>>>> but might enable your use.
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>>> rahulpatwari8383@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>>> different WindowFns (
>>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>>> ).
>>>>>
>>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection],
>>>>> by performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>>>
>>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>>> by performing 'SideInputJoin'?
>>>>>
>>>>> Regards,
>>>>> Rahul
>>>>>
>>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by Rui Wang <ru...@google.com>.
To be more clear, I think it's useful if we can achieve the following that
you wrote

PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
TupleTag("LookupTable"));
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));

-Rui

On Thu, Jul 25, 2019 at 1:56 PM Rui Wang <ru...@google.com> wrote:

> Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
> slow changing table join problem.
>
> To your question: "Can we implement SideInputJoin for this case", there
> are two perspectives.
>
> In terms of implementing the slowing changing lookup cache pattern
> <https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs> in
> BeamSQL, such sidinput join can be done that way. At least it worth
> exploring it until we identify blockers. I also think this pattern is
> already useful to users.
>
> In terms of Join schematic, I think it's hard to reason data completeness
> since one side of join is changing.
>
> -Rui
>
>
> On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi Kenn,
>>
>> If we consider the following two *Unbounded* PCollections:
>> - PCollection1 => [*Non-Global* Window with Default Trigger]
>> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
>> coincidentally turned out to be the opposite
>>
>> Joining these two PCollections in BeamSql currently is not possible
>> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
>> Mismatch)
>> But in this case, PCollection1 can be joined with PCollection2 using
>> SideInputJoin (
>> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
>> which is being done for Joining an Unbounded PCollection with Bounded
>> PCollection. I am thinking that Beam can guarantee it joins all input
>> elements once per window for this case.
>> The result of the join might be fuzzy for the window when the Trigger for
>> PCollection2 fires and sideinput gets loaded into Memory.
>>
>> PCollection2 can be considered as Slowly Changing Lookup Cache and
>> BeamSql can support Pattern:
>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
>> which is currently not possible.
>> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for
>> BeamSql to natively support PCollectionView so that BeamSql supports
>> "Slowly Updating Global Window Sideinput Pattern" using SqlTransform's
>> TableProvider.
>>
>> If we can support this, User will be able to do:
>> PCollection mainStream = ...;
>> PCollection lookupStream = ...;
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> new TupleTag("LookupTable"));
>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>
>> Can we implement SideInputJoin for this case?
>> I might be wrong in my understanding. Please let me know your thoughts.
>>
>> Thanks,
>> Rahul
>>
>> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> I think the best way to approach this is probably to have an example SQL
>>> statement and to discuss what the relational semantics should be.
>>>
>>> Windowing is not really part of SQL (yet) and in a way it just needs
>>> very minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>>> BY operation, where you GROUP BY window columns that were added. So it is
>>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>>> problem of them being incompatible.
>>>
>>> With Beam SQL there are basically two ways of windowing that work
>>> totally differently:
>>>
>>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>>> input PCollection windowfn
>>> 2. PCollection windowing where the SQL does not do any windowing - this
>>> should apply the SQL expression to each window independently
>>>
>>> In order to support a hybrid of these, it might be:
>>>
>>> 3. SQL style windowing, where when a PCollection has window assigned,
>>> the window columns are added before the SQL is applied. It is a bit strange
>>> but might enable your use.
>>>
>>> Kenn
>>>
>>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>>> rahulpatwari8383@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Beam currently doesn't support Join of Unbounded PCollections of
>>>> different WindowFns (
>>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>>> ).
>>>>
>>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>>
>>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>>> Non-Default Trigger(probably a slow-changing lookup cache
>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>>> by performing 'SideInputJoin'?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by Rui Wang <ru...@google.com>.
Hi Rahul, thanks for your detailed writeup. It pretty much summarizes the
slow changing table join problem.

To your question: "Can we implement SideInputJoin for this case", there are
two perspectives.

In terms of implementing the slowing changing lookup cache pattern
<https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs>
in
BeamSQL, such sidinput join can be done that way. At least it worth
exploring it until we identify blockers. I also think this pattern is
already useful to users.

In terms of Join schematic, I think it's hard to reason data completeness
since one side of join is changing.

-Rui


On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi Kenn,
>
> If we consider the following two *Unbounded* PCollections:
> - PCollection1 => [*Non-Global* Window with Default Trigger]
> - PCollection2 => [Global Window with *Non-Default* Trigger] :)
> coincidentally turned out to be the opposite
>
> Joining these two PCollections in BeamSql currently is not possible
> because of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn
> Mismatch)
> But in this case, PCollection1 can be joined with PCollection2 using
> SideInputJoin (
> https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
> which is being done for Joining an Unbounded PCollection with Bounded
> PCollection. I am thinking that Beam can guarantee it joins all input
> elements once per window for this case.
> The result of the join might be fuzzy for the window when the Trigger for
> PCollection2 fires and sideinput gets loaded into Memory.
>
> PCollection2 can be considered as Slowly Changing Lookup Cache and BeamSql
> can support Pattern:
> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
> which is currently not possible.
> I am working on https://jira.apache.org/jira/browse/BEAM-7758 for BeamSql
> to natively support PCollectionView so that BeamSql supports "Slowly
> Updating Global Window Sideinput Pattern" using SqlTransform's
> TableProvider.
>
> If we can support this, User will be able to do:
> PCollection mainStream = ...;
> PCollection lookupStream = ...;
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> new TupleTag("LookupTable"));
> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>
> Can we implement SideInputJoin for this case?
> I might be wrong in my understanding. Please let me know your thoughts.
>
> Thanks,
> Rahul
>
> On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> I think the best way to approach this is probably to have an example SQL
>> statement and to discuss what the relational semantics should be.
>>
>> Windowing is not really part of SQL (yet) and in a way it just needs very
>> minimal extensions. See https://arxiv.org/abs/1905.12133. In this
>> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
>> BY operation, where you GROUP BY window columns that were added. So it is
>> more explicit than in Beam. Relations do not have a WindowFn so there is no
>> problem of them being incompatible.
>>
>> With Beam SQL there are basically two ways of windowing that work totally
>> differently:
>>
>> 1. SQL style windowing where you GROUP BY windows. This does not use the
>> input PCollection windowfn
>> 2. PCollection windowing where the SQL does not do any windowing - this
>> should apply the SQL expression to each window independently
>>
>> In order to support a hybrid of these, it might be:
>>
>> 3. SQL style windowing, where when a PCollection has window assigned, the
>> window columns are added before the SQL is applied. It is a bit strange but
>> might enable your use.
>>
>> Kenn
>>
>> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <
>> rahulpatwari8383@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Beam currently doesn't support Join of Unbounded PCollections of
>>> different WindowFns (
>>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>>> ).
>>>
>>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>>
>>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection],
>>> when one of the Unbounded PCollection has [GlobalWindows Applied with
>>> Non-Default Trigger(probably a slow-changing lookup cache
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>>> by performing 'SideInputJoin'?
>>>
>>> Regards,
>>> Rahul
>>>
>>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by rahul patwari <ra...@gmail.com>.
Hi Kenn,

If we consider the following two *Unbounded* PCollections:
- PCollection1 => [*Non-Global* Window with Default Trigger]
- PCollection2 => [Global Window with *Non-Default* Trigger] :)
coincidentally turned out to be the opposite

Joining these two PCollections in BeamSql currently is not possible because
of https://jira.apache.org/jira/browse/BEAM-3345(WindowFn Mismatch)
But in this case, PCollection1 can be joined with PCollection2 using
SideInputJoin (
https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L456),
which is being done for Joining an Unbounded PCollection with Bounded
PCollection. I am thinking that Beam can guarantee it joins all input
elements once per window for this case.
The result of the join might be fuzzy for the window when the Trigger for
PCollection2 fires and sideinput gets loaded into Memory.

PCollection2 can be considered as Slowly Changing Lookup Cache and BeamSql
can support Pattern:
https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs,
which is currently not possible.
I am working on https://jira.apache.org/jira/browse/BEAM-7758 for BeamSql
to natively support PCollectionView so that BeamSql supports "Slowly
Updating Global Window Sideinput Pattern" using SqlTransform's
TableProvider.

If we can support this, User will be able to do:
PCollection mainStream = ...;
PCollection lookupStream = ...;
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"), new
TupleTag("LookupTable"));
tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));

Can we implement SideInputJoin for this case?
I might be wrong in my understanding. Please let me know your thoughts.

Thanks,
Rahul

On Thu, Jul 25, 2019 at 9:28 AM Kenneth Knowles <ke...@apache.org> wrote:

> I think the best way to approach this is probably to have an example SQL
> statement and to discuss what the relational semantics should be.
>
> Windowing is not really part of SQL (yet) and in a way it just needs very
> minimal extensions. See https://arxiv.org/abs/1905.12133. In this
> proposal for SQL, windowed aggregation is explicitly be part of the GROUP
> BY operation, where you GROUP BY window columns that were added. So it is
> more explicit than in Beam. Relations do not have a WindowFn so there is no
> problem of them being incompatible.
>
> With Beam SQL there are basically two ways of windowing that work totally
> differently:
>
> 1. SQL style windowing where you GROUP BY windows. This does not use the
> input PCollection windowfn
> 2. PCollection windowing where the SQL does not do any windowing - this
> should apply the SQL expression to each window independently
>
> In order to support a hybrid of these, it might be:
>
> 3. SQL style windowing, where when a PCollection has window assigned, the
> window columns are added before the SQL is applied. It is a bit strange but
> might enable your use.
>
> Kenn
>
> On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Beam currently doesn't support Join of Unbounded PCollections of
>> different WindowFns (
>> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
>> ).
>>
>> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
>> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>>
>> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
>> one of the Unbounded PCollection has [GlobalWindows Applied with
>> Non-Default Trigger(probably a slow-changing lookup cache
>> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
>> by performing 'SideInputJoin'?
>>
>> Regards,
>> Rahul
>>
>

Re: Enhancement for Joining Unbounded PCollections of different WindowFns

Posted by Kenneth Knowles <ke...@apache.org>.
I think the best way to approach this is probably to have an example SQL
statement and to discuss what the relational semantics should be.

Windowing is not really part of SQL (yet) and in a way it just needs very
minimal extensions. See https://arxiv.org/abs/1905.12133. In this proposal
for SQL, windowed aggregation is explicitly be part of the GROUP BY
operation, where you GROUP BY window columns that were added. So it is more
explicit than in Beam. Relations do not have a WindowFn so there is no
problem of them being incompatible.

With Beam SQL there are basically two ways of windowing that work totally
differently:

1. SQL style windowing where you GROUP BY windows. This does not use the
input PCollection windowfn
2. PCollection windowing where the SQL does not do any windowing - this
should apply the SQL expression to each window independently

In order to support a hybrid of these, it might be:

3. SQL style windowing, where when a PCollection has window assigned, the
window columns are added before the SQL is applied. It is a bit strange but
might enable your use.

Kenn

On Mon, Jul 22, 2019 at 10:39 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> Beam currently doesn't support Join of Unbounded PCollections of different
> WindowFns (
> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
> ).
>
> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>
> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
> one of the Unbounded PCollection has [GlobalWindows Applied with
> Non-Default Trigger(probably a slow-changing lookup cache
> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
> by performing 'SideInputJoin'?
>
> Regards,
> Rahul
>