You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Rui Wang <ru...@google.com> on 2019/08/13 06:30:52 UTC

[DISCUSS] Multiple-triggering SQL Join with retractions support

Hi Community,

BeamSQL currently does not support unbounded-unbounded join with
non-default trigger. It is because:

- Discarding mode does not work for outer joins because of lacking of
ability to retract pre-emitted values. You can think about an example in
which a tuple of (left_row, null) needed to be retracted  if the matched
right_row appears since last trigger fired.
- Accumulating mode *theoretically* can support unbounded-unbounded join
because it's supposed to always "overwrite" previous result. However in
practice, for join use cases such overwriting is too expensive. It would be
much more efficient if small changes in inputs of join only cause small
changes to downstream to compute.
- Both discarding mode and accumulating mode are not sufficient to refine
materialized data.

Meanwhile, [1] has kicked off a discussion on retractions in Beam model. I
have been collecting people's feedback and generally speaking people agree
that retractions are useful for some use cases.

Thus I propose to combine SQL join with retractions to
support multiple-triggering SQL Join.

I think SQL join is a good start for supporting retraction in Beam with the
following caveats:
1. multiple-triggering SQL Join is a useful feature.
2. SQL join is an opportunity for us to figure out implementation details
of retraction by building it for a well defined use case.
3. Supporting retraction should not cause performance regression on
existing pipelines, or require changes on existing pipelines.


What do you think?

[1]:
https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E


-Rui

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Rui Wang <ru...@google.com>.
Kenn - Yep totally agree the first phrase should not include EMIT. Although
it would be interesting to explore EMIT support in Calcite as a R&D work.

Mingmin - Thanks for you example query, which is an interesting use case,
in which two inputs are aggregations with different modes. Retraction won't
be a concern. The more interesting question is how can we allow setup
different modes in SQL query. It certainly beyond scope of this thread. We
might could add more syntax to EMIT to allow control
acc/discarding/retracting. However, it's not discussed yet so I don't have
a clear idea.


-Rui

On Wed, Aug 21, 2019 at 12:22 PM Mingmin Xu <mi...@gmail.com> wrote:

> @Rui In my cases, we have some complex queries like
> SELECT ...
> FROM ( SELECT ... FROM PRE_A GROUP BY id, TUMBLE(1 HOUR) ) A
> JOIN ( SELECT ... FROM PRE_B GROUP BY id, TUMBLE(1 HOUR) ) B
> ON A.id=B.id
> //A emit every minute on accumulate mode and B emit every minute on
> discard move.
>
> Would be interested to know how it can support with retraction in SQL,
> currently this operation is blocked simply.
>
> Mingmin
>
> On Wed, Aug 21, 2019 at 11:21 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> These all sound useful. One thing is that the EMIT syntax is a more early
>> idea, and more likely subject to some changes. The problem with EMIT
>> anywhere except the top level is that it is not very composable. It really
>> belongs most as part of an INSERT statement, just like sink triggers.
>>
>> Maybe a first step is to do the basics for retractions in Beam itself.
>> This is already a lot of work (I just reviewed your prototype and Anton's
>> together so I have a very good idea where it is at). Once we have the
>> basics, then SqlTransform can have triggers set on its input and still work
>> with grouping and joins. That will let us explore retractions in SQL
>> without depending on EMIT.
>>
>> Kenn
>>
>> On Mon, Aug 19, 2019 at 7:02 PM Rui Wang <ru...@google.com> wrote:
>>
>>> I am also asking TVF windowing and EMIT syntax support in dev@calcite.
>>> See [1].
>>>
>>>
>>>
>>> [1]:
>>> https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E
>>>
>>> -Rui
>>>
>>> On Mon, Aug 19, 2019 at 4:40 PM Rui Wang <ru...@google.com> wrote:
>>>
>>>> Hi Mingmin,
>>>>
>>>> Thanks for adding "INSERT INTO" (which I missed from the example)
>>>>
>>>> I am not sure if I understand the question:
>>>>
>>>> 1. multiple GBK with retraction is solved by [1].
>>>> 2. In terms of SQL and its view, the output are defined by the last GBK.
>>>>
>>>> [1]:
>>>> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>>>>
>>>>
>>>> -Rui
>>>>
>>>> On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu <mi...@gmail.com> wrote:
>>>>
>>>>> +1 to support EMIT in Beam side first if we cannot include it in
>>>>> Calcite in short time(See #1, #2). I'm open to use any format, the one
>>>>> above or something as below. The tricky question is, what's the expected
>>>>> behavior for a complex query with more than 1 GBK operators?
>>>>>
>>>>> EMIT  <INTERVAL '1' MINUTE> | <INTERVAL '100' ROW> [ACCUMULATE|DISCARD]
>>>>> [INSERT INTO ...]
>>>>> SELECT ...
>>>>>
>>>>> #1.
>>>>> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
>>>>> #2
>>>>> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>>>>>
>>>>> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang <ru...@google.com> wrote:
>>>>>
>>>>>> To update this idea, I think we can go a step further to support EMIT
>>>>>> syntax from one-sql-to-rule-them-all paper [1].
>>>>>>
>>>>>> EMIT will allow periodic delay stream materialization. For stream
>>>>>> view, it means we will add support to sinks to keep generating a changelog
>>>>>> table. For view only, it means we will add support to sinks to generate a
>>>>>> compacted table form changelog table periodically.
>>>>>>
>>>>>> Regarding to SQL, a typical query like the following should run:
>>>>>>
>>>>>>
>>>>>> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
>>>>>> *SELECT XX FROM HOP(joined_table)*
>>>>>> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>>>>>>
>>>>>>
>>>>>> By doing so, retractions will be much useful for SQL from a product
>>>>>> scenario, in which we can have a meaningful end to end SQL pipeline.
>>>>>>
>>>>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:
>>>>>>
>>>>>>> Hi Community,
>>>>>>>
>>>>>>> BeamSQL currently does not support unbounded-unbounded join with
>>>>>>> non-default trigger. It is because:
>>>>>>>
>>>>>>> - Discarding mode does not work for outer joins because of lacking
>>>>>>> of ability to retract pre-emitted values. You can think about an example in
>>>>>>> which a tuple of (left_row, null) needed to be retracted  if the matched
>>>>>>> right_row appears since last trigger fired.
>>>>>>> - Accumulating mode *theoretically* can support unbounded-unbounded
>>>>>>> join because it's supposed to always "overwrite" previous result. However
>>>>>>> in practice, for join use cases such overwriting is too expensive. It would
>>>>>>> be much more efficient if small changes in inputs of join only cause small
>>>>>>> changes to downstream to compute.
>>>>>>> - Both discarding mode and accumulating mode are not sufficient to
>>>>>>> refine materialized data.
>>>>>>>
>>>>>>> Meanwhile, [1] has kicked off a discussion on retractions in Beam
>>>>>>> model. I have been collecting people's feedback and generally speaking
>>>>>>> people agree that retractions are useful for some use cases.
>>>>>>>
>>>>>>> Thus I propose to combine SQL join with retractions to
>>>>>>> support multiple-triggering SQL Join.
>>>>>>>
>>>>>>> I think SQL join is a good start for supporting retraction in Beam
>>>>>>> with the following caveats:
>>>>>>> 1. multiple-triggering SQL Join is a useful feature.
>>>>>>> 2. SQL join is an opportunity for us to figure out implementation
>>>>>>> details of retraction by building it for a well defined use case.
>>>>>>> 3. Supporting retraction should not cause performance regression on
>>>>>>> existing pipelines, or require changes on existing pipelines.
>>>>>>>
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> [1]:
>>>>>>> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>>>>>>>
>>>>>>>
>>>>>>> -Rui
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> ----
>>>>> Mingmin
>>>>>
>>>>
>
> --
> ----
> Mingmin
>

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Mingmin Xu <mi...@gmail.com>.
@Rui In my cases, we have some complex queries like
SELECT ...
FROM ( SELECT ... FROM PRE_A GROUP BY id, TUMBLE(1 HOUR) ) A
JOIN ( SELECT ... FROM PRE_B GROUP BY id, TUMBLE(1 HOUR) ) B
ON A.id=B.id
//A emit every minute on accumulate mode and B emit every minute on discard
move.

Would be interested to know how it can support with retraction in SQL,
currently this operation is blocked simply.

Mingmin

On Wed, Aug 21, 2019 at 11:21 AM Kenneth Knowles <ke...@apache.org> wrote:

> These all sound useful. One thing is that the EMIT syntax is a more early
> idea, and more likely subject to some changes. The problem with EMIT
> anywhere except the top level is that it is not very composable. It really
> belongs most as part of an INSERT statement, just like sink triggers.
>
> Maybe a first step is to do the basics for retractions in Beam itself.
> This is already a lot of work (I just reviewed your prototype and Anton's
> together so I have a very good idea where it is at). Once we have the
> basics, then SqlTransform can have triggers set on its input and still work
> with grouping and joins. That will let us explore retractions in SQL
> without depending on EMIT.
>
> Kenn
>
> On Mon, Aug 19, 2019 at 7:02 PM Rui Wang <ru...@google.com> wrote:
>
>> I am also asking TVF windowing and EMIT syntax support in dev@calcite.
>> See [1].
>>
>>
>>
>> [1]:
>> https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E
>>
>> -Rui
>>
>> On Mon, Aug 19, 2019 at 4:40 PM Rui Wang <ru...@google.com> wrote:
>>
>>> Hi Mingmin,
>>>
>>> Thanks for adding "INSERT INTO" (which I missed from the example)
>>>
>>> I am not sure if I understand the question:
>>>
>>> 1. multiple GBK with retraction is solved by [1].
>>> 2. In terms of SQL and its view, the output are defined by the last GBK.
>>>
>>> [1]:
>>> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu <mi...@gmail.com> wrote:
>>>
>>>> +1 to support EMIT in Beam side first if we cannot include it in
>>>> Calcite in short time(See #1, #2). I'm open to use any format, the one
>>>> above or something as below. The tricky question is, what's the expected
>>>> behavior for a complex query with more than 1 GBK operators?
>>>>
>>>> EMIT  <INTERVAL '1' MINUTE> | <INTERVAL '100' ROW> [ACCUMULATE|DISCARD]
>>>> [INSERT INTO ...]
>>>> SELECT ...
>>>>
>>>> #1.
>>>> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
>>>> #2
>>>> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>>>>
>>>> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang <ru...@google.com> wrote:
>>>>
>>>>> To update this idea, I think we can go a step further to support EMIT
>>>>> syntax from one-sql-to-rule-them-all paper [1].
>>>>>
>>>>> EMIT will allow periodic delay stream materialization. For stream
>>>>> view, it means we will add support to sinks to keep generating a changelog
>>>>> table. For view only, it means we will add support to sinks to generate a
>>>>> compacted table form changelog table periodically.
>>>>>
>>>>> Regarding to SQL, a typical query like the following should run:
>>>>>
>>>>>
>>>>> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
>>>>> *SELECT XX FROM HOP(joined_table)*
>>>>> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>>>>>
>>>>>
>>>>> By doing so, retractions will be much useful for SQL from a product
>>>>> scenario, in which we can have a meaningful end to end SQL pipeline.
>>>>>
>>>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:
>>>>>
>>>>>> Hi Community,
>>>>>>
>>>>>> BeamSQL currently does not support unbounded-unbounded join with
>>>>>> non-default trigger. It is because:
>>>>>>
>>>>>> - Discarding mode does not work for outer joins because of lacking of
>>>>>> ability to retract pre-emitted values. You can think about an example in
>>>>>> which a tuple of (left_row, null) needed to be retracted  if the matched
>>>>>> right_row appears since last trigger fired.
>>>>>> - Accumulating mode *theoretically* can support unbounded-unbounded
>>>>>> join because it's supposed to always "overwrite" previous result. However
>>>>>> in practice, for join use cases such overwriting is too expensive. It would
>>>>>> be much more efficient if small changes in inputs of join only cause small
>>>>>> changes to downstream to compute.
>>>>>> - Both discarding mode and accumulating mode are not sufficient to
>>>>>> refine materialized data.
>>>>>>
>>>>>> Meanwhile, [1] has kicked off a discussion on retractions in Beam
>>>>>> model. I have been collecting people's feedback and generally speaking
>>>>>> people agree that retractions are useful for some use cases.
>>>>>>
>>>>>> Thus I propose to combine SQL join with retractions to
>>>>>> support multiple-triggering SQL Join.
>>>>>>
>>>>>> I think SQL join is a good start for supporting retraction in Beam
>>>>>> with the following caveats:
>>>>>> 1. multiple-triggering SQL Join is a useful feature.
>>>>>> 2. SQL join is an opportunity for us to figure out implementation
>>>>>> details of retraction by building it for a well defined use case.
>>>>>> 3. Supporting retraction should not cause performance regression on
>>>>>> existing pipelines, or require changes on existing pipelines.
>>>>>>
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> [1]:
>>>>>> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>>>>>>
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>
>>>>
>>>> --
>>>> ----
>>>> Mingmin
>>>>
>>>

-- 
----
Mingmin

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Kenneth Knowles <ke...@apache.org>.
These all sound useful. One thing is that the EMIT syntax is a more early
idea, and more likely subject to some changes. The problem with EMIT
anywhere except the top level is that it is not very composable. It really
belongs most as part of an INSERT statement, just like sink triggers.

Maybe a first step is to do the basics for retractions in Beam itself. This
is already a lot of work (I just reviewed your prototype and Anton's
together so I have a very good idea where it is at). Once we have the
basics, then SqlTransform can have triggers set on its input and still work
with grouping and joins. That will let us explore retractions in SQL
without depending on EMIT.

Kenn

On Mon, Aug 19, 2019 at 7:02 PM Rui Wang <ru...@google.com> wrote:

> I am also asking TVF windowing and EMIT syntax support in dev@calcite.
> See [1].
>
>
>
> [1]:
> https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E
>
> -Rui
>
> On Mon, Aug 19, 2019 at 4:40 PM Rui Wang <ru...@google.com> wrote:
>
>> Hi Mingmin,
>>
>> Thanks for adding "INSERT INTO" (which I missed from the example)
>>
>> I am not sure if I understand the question:
>>
>> 1. multiple GBK with retraction is solved by [1].
>> 2. In terms of SQL and its view, the output are defined by the last GBK.
>>
>> [1]:
>> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>>
>>
>> -Rui
>>
>> On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu <mi...@gmail.com> wrote:
>>
>>> +1 to support EMIT in Beam side first if we cannot include it in Calcite
>>> in short time(See #1, #2). I'm open to use any format, the one above or
>>> something as below. The tricky question is, what's the expected behavior
>>> for a complex query with more than 1 GBK operators?
>>>
>>> EMIT  <INTERVAL '1' MINUTE> | <INTERVAL '100' ROW> [ACCUMULATE|DISCARD]
>>> [INSERT INTO ...]
>>> SELECT ...
>>>
>>> #1.
>>> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
>>> #2
>>> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>>>
>>> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang <ru...@google.com> wrote:
>>>
>>>> To update this idea, I think we can go a step further to support EMIT
>>>> syntax from one-sql-to-rule-them-all paper [1].
>>>>
>>>> EMIT will allow periodic delay stream materialization. For stream view,
>>>> it means we will add support to sinks to keep generating a changelog table.
>>>> For view only, it means we will add support to sinks to generate a
>>>> compacted table form changelog table periodically.
>>>>
>>>> Regarding to SQL, a typical query like the following should run:
>>>>
>>>>
>>>> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
>>>> *SELECT XX FROM HOP(joined_table)*
>>>> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>>>>
>>>>
>>>> By doing so, retractions will be much useful for SQL from a product
>>>> scenario, in which we can have a meaningful end to end SQL pipeline.
>>>>
>>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>>>
>>>> -Rui
>>>>
>>>> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:
>>>>
>>>>> Hi Community,
>>>>>
>>>>> BeamSQL currently does not support unbounded-unbounded join with
>>>>> non-default trigger. It is because:
>>>>>
>>>>> - Discarding mode does not work for outer joins because of lacking of
>>>>> ability to retract pre-emitted values. You can think about an example in
>>>>> which a tuple of (left_row, null) needed to be retracted  if the matched
>>>>> right_row appears since last trigger fired.
>>>>> - Accumulating mode *theoretically* can support unbounded-unbounded
>>>>> join because it's supposed to always "overwrite" previous result. However
>>>>> in practice, for join use cases such overwriting is too expensive. It would
>>>>> be much more efficient if small changes in inputs of join only cause small
>>>>> changes to downstream to compute.
>>>>> - Both discarding mode and accumulating mode are not sufficient to
>>>>> refine materialized data.
>>>>>
>>>>> Meanwhile, [1] has kicked off a discussion on retractions in Beam
>>>>> model. I have been collecting people's feedback and generally speaking
>>>>> people agree that retractions are useful for some use cases.
>>>>>
>>>>> Thus I propose to combine SQL join with retractions to
>>>>> support multiple-triggering SQL Join.
>>>>>
>>>>> I think SQL join is a good start for supporting retraction in Beam
>>>>> with the following caveats:
>>>>> 1. multiple-triggering SQL Join is a useful feature.
>>>>> 2. SQL join is an opportunity for us to figure out implementation
>>>>> details of retraction by building it for a well defined use case.
>>>>> 3. Supporting retraction should not cause performance regression on
>>>>> existing pipelines, or require changes on existing pipelines.
>>>>>
>>>>>
>>>>> What do you think?
>>>>>
>>>>> [1]:
>>>>> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>
>>>
>>> --
>>> ----
>>> Mingmin
>>>
>>

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Rui Wang <ru...@google.com>.
I am also asking TVF windowing and EMIT syntax support in dev@calcite. See
[1].



[1]:
https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E

-Rui

On Mon, Aug 19, 2019 at 4:40 PM Rui Wang <ru...@google.com> wrote:

> Hi Mingmin,
>
> Thanks for adding "INSERT INTO" (which I missed from the example)
>
> I am not sure if I understand the question:
>
> 1. multiple GBK with retraction is solved by [1].
> 2. In terms of SQL and its view, the output are defined by the last GBK.
>
> [1]:
> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>
>
> -Rui
>
> On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu <mi...@gmail.com> wrote:
>
>> +1 to support EMIT in Beam side first if we cannot include it in Calcite
>> in short time(See #1, #2). I'm open to use any format, the one above or
>> something as below. The tricky question is, what's the expected behavior
>> for a complex query with more than 1 GBK operators?
>>
>> EMIT  <INTERVAL '1' MINUTE> | <INTERVAL '100' ROW> [ACCUMULATE|DISCARD]
>> [INSERT INTO ...]
>> SELECT ...
>>
>> #1.
>> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
>> #2
>> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>>
>> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang <ru...@google.com> wrote:
>>
>>> To update this idea, I think we can go a step further to support EMIT
>>> syntax from one-sql-to-rule-them-all paper [1].
>>>
>>> EMIT will allow periodic delay stream materialization. For stream view,
>>> it means we will add support to sinks to keep generating a changelog table.
>>> For view only, it means we will add support to sinks to generate a
>>> compacted table form changelog table periodically.
>>>
>>> Regarding to SQL, a typical query like the following should run:
>>>
>>>
>>> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
>>> *SELECT XX FROM HOP(joined_table)*
>>> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>>>
>>>
>>> By doing so, retractions will be much useful for SQL from a product
>>> scenario, in which we can have a meaningful end to end SQL pipeline.
>>>
>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>>
>>> -Rui
>>>
>>> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:
>>>
>>>> Hi Community,
>>>>
>>>> BeamSQL currently does not support unbounded-unbounded join with
>>>> non-default trigger. It is because:
>>>>
>>>> - Discarding mode does not work for outer joins because of lacking of
>>>> ability to retract pre-emitted values. You can think about an example in
>>>> which a tuple of (left_row, null) needed to be retracted  if the matched
>>>> right_row appears since last trigger fired.
>>>> - Accumulating mode *theoretically* can support unbounded-unbounded
>>>> join because it's supposed to always "overwrite" previous result. However
>>>> in practice, for join use cases such overwriting is too expensive. It would
>>>> be much more efficient if small changes in inputs of join only cause small
>>>> changes to downstream to compute.
>>>> - Both discarding mode and accumulating mode are not sufficient to
>>>> refine materialized data.
>>>>
>>>> Meanwhile, [1] has kicked off a discussion on retractions in Beam
>>>> model. I have been collecting people's feedback and generally speaking
>>>> people agree that retractions are useful for some use cases.
>>>>
>>>> Thus I propose to combine SQL join with retractions to
>>>> support multiple-triggering SQL Join.
>>>>
>>>> I think SQL join is a good start for supporting retraction in Beam with
>>>> the following caveats:
>>>> 1. multiple-triggering SQL Join is a useful feature.
>>>> 2. SQL join is an opportunity for us to figure out implementation
>>>> details of retraction by building it for a well defined use case.
>>>> 3. Supporting retraction should not cause performance regression on
>>>> existing pipelines, or require changes on existing pipelines.
>>>>
>>>>
>>>> What do you think?
>>>>
>>>> [1]:
>>>> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>>>>
>>>>
>>>> -Rui
>>>>
>>>
>>
>> --
>> ----
>> Mingmin
>>
>

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Rui Wang <ru...@google.com>.
Hi Mingmin,

Thanks for adding "INSERT INTO" (which I missed from the example)

I am not sure if I understand the question:

1. multiple GBK with retraction is solved by [1].
2. In terms of SQL and its view, the output are defined by the last GBK.

[1]:
https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing


-Rui

On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu <mi...@gmail.com> wrote:

> +1 to support EMIT in Beam side first if we cannot include it in Calcite
> in short time(See #1, #2). I'm open to use any format, the one above or
> something as below. The tricky question is, what's the expected behavior
> for a complex query with more than 1 GBK operators?
>
> EMIT  <INTERVAL '1' MINUTE> | <INTERVAL '100' ROW> [ACCUMULATE|DISCARD]
> [INSERT INTO ...]
> SELECT ...
>
> #1.
> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
> #2
> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>
> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang <ru...@google.com> wrote:
>
>> To update this idea, I think we can go a step further to support EMIT
>> syntax from one-sql-to-rule-them-all paper [1].
>>
>> EMIT will allow periodic delay stream materialization. For stream view,
>> it means we will add support to sinks to keep generating a changelog table.
>> For view only, it means we will add support to sinks to generate a
>> compacted table form changelog table periodically.
>>
>> Regarding to SQL, a typical query like the following should run:
>>
>>
>> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
>> *SELECT XX FROM HOP(joined_table)*
>> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>>
>>
>> By doing so, retractions will be much useful for SQL from a product
>> scenario, in which we can have a meaningful end to end SQL pipeline.
>>
>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>
>> -Rui
>>
>> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:
>>
>>> Hi Community,
>>>
>>> BeamSQL currently does not support unbounded-unbounded join with
>>> non-default trigger. It is because:
>>>
>>> - Discarding mode does not work for outer joins because of lacking of
>>> ability to retract pre-emitted values. You can think about an example in
>>> which a tuple of (left_row, null) needed to be retracted  if the matched
>>> right_row appears since last trigger fired.
>>> - Accumulating mode *theoretically* can support unbounded-unbounded
>>> join because it's supposed to always "overwrite" previous result. However
>>> in practice, for join use cases such overwriting is too expensive. It would
>>> be much more efficient if small changes in inputs of join only cause small
>>> changes to downstream to compute.
>>> - Both discarding mode and accumulating mode are not sufficient to
>>> refine materialized data.
>>>
>>> Meanwhile, [1] has kicked off a discussion on retractions in Beam model.
>>> I have been collecting people's feedback and generally speaking people
>>> agree that retractions are useful for some use cases.
>>>
>>> Thus I propose to combine SQL join with retractions to
>>> support multiple-triggering SQL Join.
>>>
>>> I think SQL join is a good start for supporting retraction in Beam with
>>> the following caveats:
>>> 1. multiple-triggering SQL Join is a useful feature.
>>> 2. SQL join is an opportunity for us to figure out implementation
>>> details of retraction by building it for a well defined use case.
>>> 3. Supporting retraction should not cause performance regression on
>>> existing pipelines, or require changes on existing pipelines.
>>>
>>>
>>> What do you think?
>>>
>>> [1]:
>>> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>>>
>>>
>>> -Rui
>>>
>>
>
> --
> ----
> Mingmin
>

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Mingmin Xu <mi...@gmail.com>.
+1 to support EMIT in Beam side first if we cannot include it in Calcite in
short time(See #1, #2). I'm open to use any format, the one above or
something as below. The tricky question is, what's the expected behavior
for a complex query with more than 1 GBK operators?

EMIT  <INTERVAL '1' MINUTE> | <INTERVAL '100' ROW> [ACCUMULATE|DISCARD]
[INSERT INTO ...]
SELECT ...

#1.
https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
#2
https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2

On Mon, Aug 19, 2019 at 12:02 PM Rui Wang <ru...@google.com> wrote:

> To update this idea, I think we can go a step further to support EMIT
> syntax from one-sql-to-rule-them-all paper [1].
>
> EMIT will allow periodic delay stream materialization. For stream view, it
> means we will add support to sinks to keep generating a changelog table.
> For view only, it means we will add support to sinks to generate a
> compacted table form changelog table periodically.
>
> Regarding to SQL, a typical query like the following should run:
>
>
> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
> *SELECT XX FROM HOP(joined_table)*
> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>
>
> By doing so, retractions will be much useful for SQL from a product
> scenario, in which we can have a meaningful end to end SQL pipeline.
>
> [1]: https://arxiv.org/pdf/1905.12133.pdf
>
> -Rui
>
> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:
>
>> Hi Community,
>>
>> BeamSQL currently does not support unbounded-unbounded join with
>> non-default trigger. It is because:
>>
>> - Discarding mode does not work for outer joins because of lacking of
>> ability to retract pre-emitted values. You can think about an example in
>> which a tuple of (left_row, null) needed to be retracted  if the matched
>> right_row appears since last trigger fired.
>> - Accumulating mode *theoretically* can support unbounded-unbounded join
>> because it's supposed to always "overwrite" previous result. However in
>> practice, for join use cases such overwriting is too expensive. It would be
>> much more efficient if small changes in inputs of join only cause small
>> changes to downstream to compute.
>> - Both discarding mode and accumulating mode are not sufficient to refine
>> materialized data.
>>
>> Meanwhile, [1] has kicked off a discussion on retractions in Beam model.
>> I have been collecting people's feedback and generally speaking people
>> agree that retractions are useful for some use cases.
>>
>> Thus I propose to combine SQL join with retractions to
>> support multiple-triggering SQL Join.
>>
>> I think SQL join is a good start for supporting retraction in Beam with
>> the following caveats:
>> 1. multiple-triggering SQL Join is a useful feature.
>> 2. SQL join is an opportunity for us to figure out implementation details
>> of retraction by building it for a well defined use case.
>> 3. Supporting retraction should not cause performance regression on
>> existing pipelines, or require changes on existing pipelines.
>>
>>
>> What do you think?
>>
>> [1]:
>> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>>
>>
>> -Rui
>>
>

-- 
----
Mingmin

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

Posted by Rui Wang <ru...@google.com>.
To update this idea, I think we can go a step further to support EMIT
syntax from one-sql-to-rule-them-all paper [1].

EMIT will allow periodic delay stream materialization. For stream view, it
means we will add support to sinks to keep generating a changelog table.
For view only, it means we will add support to sinks to generate a
compacted table form changelog table periodically.

Regarding to SQL, a typical query like the following should run:


*WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
*SELECT XX FROM HOP(joined_table)*
*EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*


By doing so, retractions will be much useful for SQL from a product
scenario, in which we can have a meaningful end to end SQL pipeline.

[1]: https://arxiv.org/pdf/1905.12133.pdf

-Rui

On Mon, Aug 12, 2019 at 11:30 PM Rui Wang <ru...@google.com> wrote:

> Hi Community,
>
> BeamSQL currently does not support unbounded-unbounded join with
> non-default trigger. It is because:
>
> - Discarding mode does not work for outer joins because of lacking of
> ability to retract pre-emitted values. You can think about an example in
> which a tuple of (left_row, null) needed to be retracted  if the matched
> right_row appears since last trigger fired.
> - Accumulating mode *theoretically* can support unbounded-unbounded join
> because it's supposed to always "overwrite" previous result. However in
> practice, for join use cases such overwriting is too expensive. It would be
> much more efficient if small changes in inputs of join only cause small
> changes to downstream to compute.
> - Both discarding mode and accumulating mode are not sufficient to refine
> materialized data.
>
> Meanwhile, [1] has kicked off a discussion on retractions in Beam model. I
> have been collecting people's feedback and generally speaking people agree
> that retractions are useful for some use cases.
>
> Thus I propose to combine SQL join with retractions to
> support multiple-triggering SQL Join.
>
> I think SQL join is a good start for supporting retraction in Beam with
> the following caveats:
> 1. multiple-triggering SQL Join is a useful feature.
> 2. SQL join is an opportunity for us to figure out implementation details
> of retraction by building it for a well defined use case.
> 3. Supporting retraction should not cause performance regression on
> existing pipelines, or require changes on existing pipelines.
>
>
> What do you think?
>
> [1]:
> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>
>
> -Rui
>