You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Rui Wang <am...@apache.org> on 2020/01/21 19:21:56 UTC

[DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Hi community,

First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as
a table-valued function). As it starts to work, I want to continue
discussing the following work of streaming sql: a way to control
materialization latency of a streaming query. A small note for people who
are not familiar with streaming: because steaming queries are long running
queries (maybe months or up to year), usually there is a need to see
results before query is terminated. Thus it will be desired to have a way
to allow users to specify, e.g. how frequently they want to see some result
from the query.

(The following will be a summary of my proposal. I can convert it to a
design doc if people prefer that way. Just let me know)

*Priori work*
My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
people who contributed that paper, which became the foundation of my
discussion.

From [1], an EMIT clause was actually proposed to be added to the end of
the query. Two syntax of EMIT clause was also proposed:
1. EMIT after the watermark passes the end of the window. E.g. EMIT
[STREAM] AFTER WATERMARK.
2. Delay emit for a period of time after a change happens (e.g. element
arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES

*Typical Streaming emitting latency patterns*
1. Event time triggers. Emitting depends on the relationship between
watermark and event timestamp of events. Check this video [2] if you want
to have an introduction of watermark in the streaming world, and data
completeness concept based on event-timestamp.
2. Processing time triggers. Emitting depends on the system clock. This is
a natural idea of emitting. E.g. emit the current result every hour without
considering if data in a window is already complete.
3. data-driven triggers. E.g. emit when accumulated events exceed a
threshold (e.g. emit when have acculucated 1000 events)
4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
achieve better latency control.

*Proposal to discuss*
I want to extend the proposal in [1] and propose EMIT HAVING syntax and two
aggregation functions.

*EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar
to HAVING. There are two differences that are worth mentioning:
1. It’s not a filter. it controls when table-valued function in FROM clause
should emit a set. The set is specified by group-by keys.
2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
HAVING.

*current_watermark(). *current_watermark() is an aggregation function that
returns a timestamp that provides where watermark is for a set.

*processing_time_since_first_element().* processing_time_since_first_element()
is an aggregation function that returns the system clock time (i.e.
processing time) since the first element appears in the set.

*Motivating examples*
The following will be a list of motivating examples of how this proposal
will achieve different ways of latency control.

Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
as the following:

“SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS
T”. Based on this query, let’s say derived table T from orders  has the
following schema:


order_id BIGINT

product_id BIGINT

order_meta STRING

event_ts TIMESTAMP

window_start TIMESTAMP

window_end TIMESTAMP



The following will be a table to demonstrate for different trigger cases,
how the query will be modified by adopting EMIT syntax to express the same
semantic:

Trigger

SQL Query

AfterWatermark.pastEndOfWindow()
(emit after watermark passes end of window, thus for a window data is
believed complete)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

EMIT HAVING current_watermark() >= T.window_end

AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
standardMinutes(5)

(emit after a delay of a 5 minutes when first element appear in window)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE

AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
(emit after every event appears after data is really believed complete in a
window)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1

AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
().plusDuration(Duration.standardMinutes(5))

(emit before a window is complete, by following the delay emit strategy)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING current_watermark() < T.window_end AND
processing_time_since_first_element() >= INTERVAL 5 MINUTE

AfterWatermark.pastEndOfWindow()

.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
(Duration.standardMinutes(5))

.withLateFirings(AfterPane.elementCountAtLeast(1))

(a combination of emitting before window closes, emit on window closes and
emit after window closes)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING

current_watermark() >= T.window_end OR

(current_watermark() > T.window_end AND COUNT(*) > 1) OR

(current_watermark() < T.window_end AND
processing_time_since_first_element() >= INTERVAL 5 MINUTE)


AfterWatermark.pastEndOfWindow()

.withAllowedLateness(Duration.standardDays(2)))

(emit after window closes plus a tolerance of 2 days late data)

SELECT *

FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T

GROUP BY T.window_end

EMIT HAVING

current_watermark() >= T.window_end AND current_watermark() < T.window_end
+ INTERVAL 2 DAY

Composite triggers
illustrated by examples above that AND and OR can be used to concat
different bool expressions.



Please let me know your thoughts and any other way you prefer to continue
discussing.

[1]: https://arxiv.org/pdf/1905.12133.pdf
[2]: https://www.youtube.com/watch?v=TWxSLmkWPm4


Thanks,
Rui Wang

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Rui Wang <am...@apache.org>.
Forgot to mention the SQL syntax change:

SELECT clause

[FROM TVF windowing] // windowing happen here

[Where clause]

[GROUP BY clause]

[HAVING clause]
[EMIT HAVING clause] // materialization latency control


-Rui

On Tue, Jan 21, 2020 at 11:21 AM Rui Wang <am...@apache.org> wrote:

> Hi community,
>
> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works
> as a table-valued function). As it starts to work, I want to continue
> discussing the following work of streaming sql: a way to control
> materialization latency of a streaming query. A small note for people who
> are not familiar with streaming: because steaming queries are long running
> queries (maybe months or up to year), usually there is a need to see
> results before query is terminated. Thus it will be desired to have a way
> to allow users to specify, e.g. how frequently they want to see some result
> from the query.
>
> (The following will be a summary of my proposal. I can convert it to a
> design doc if people prefer that way. Just let me know)
>
> *Priori work*
> My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
> people who contributed that paper, which became the foundation of my
> discussion.
>
> From [1], an EMIT clause was actually proposed to be added to the end of
> the query. Two syntax of EMIT clause was also proposed:
> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
> [STREAM] AFTER WATERMARK.
> 2. Delay emit for a period of time after a change happens (e.g. element
> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES
>
> *Typical Streaming emitting latency patterns*
> 1. Event time triggers. Emitting depends on the relationship between
> watermark and event timestamp of events. Check this video [2] if you want
> to have an introduction of watermark in the streaming world, and data
> completeness concept based on event-timestamp.
> 2. Processing time triggers. Emitting depends on the system clock. This is
> a natural idea of emitting. E.g. emit the current result every hour without
> considering if data in a window is already complete.
> 3. data-driven triggers. E.g. emit when accumulated events exceed a
> threshold (e.g. emit when have acculucated 1000 events)
> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
> achieve better latency control.
>
> *Proposal to discuss*
> I want to extend the proposal in [1] and propose EMIT HAVING syntax and
> two aggregation functions.
>
> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar
> to HAVING. There are two differences that are worth mentioning:
> 1. It’s not a filter. it controls when table-valued function in FROM
> clause should emit a set. The set is specified by group-by keys.
> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
> HAVING.
>
> *current_watermark(). *current_watermark() is an aggregation function
> that returns a timestamp that provides where watermark is for a set.
>
> *processing_time_since_first_element().* processing_time_since_first_element()
> is an aggregation function that returns the system clock time (i.e.
> processing time) since the first element appears in the set.
>
> *Motivating examples*
> The following will be a list of motivating examples of how this proposal
> will achieve different ways of latency control.
>
> Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
> as the following:
>
> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS
> T”. Based on this query, let’s say derived table T from orders  has the
> following schema:
>
>
> order_id BIGINT
>
> product_id BIGINT
>
> order_meta STRING
>
> event_ts TIMESTAMP
>
> window_start TIMESTAMP
>
> window_end TIMESTAMP
>
>
>
> The following will be a table to demonstrate for different trigger cases,
> how the query will be modified by adopting EMIT syntax to express the same
> semantic:
>
> Trigger
>
> SQL Query
>
> AfterWatermark.pastEndOfWindow()
> (emit after watermark passes end of window, thus for a window data is
> believed complete)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> EMIT HAVING current_watermark() >= T.window_end
>
> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
> standardMinutes(5)
>
> (emit after a delay of a 5 minutes when first element appear in window)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>
> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
> (emit after every event appears after data is really believed complete in
> a window)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>
> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
> ().plusDuration(Duration.standardMinutes(5))
>
> (emit before a window is complete, by following the delay emit strategy)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING current_watermark() < T.window_end AND
> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>
> AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().
> plusDuration(Duration.standardMinutes(5))
>
> .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> (a combination of emitting before window closes, emit on window closes and
> emit after window closes)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING
>
> current_watermark() >= T.window_end OR
>
> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>
> (current_watermark() < T.window_end AND
> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>
>
> AfterWatermark.pastEndOfWindow()
>
> .withAllowedLateness(Duration.standardDays(2)))
>
> (emit after window closes plus a tolerance of 2 days late data)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING
>
> current_watermark() >= T.window_end AND current_watermark() < T.window_end
> + INTERVAL 2 DAY
>
> Composite triggers
> illustrated by examples above that AND and OR can be used to concat
> different bool expressions.
>
>
>
> Please let me know your thoughts and any other way you prefer to continue
> discussing.
>
> [1]: https://arxiv.org/pdf/1905.12133.pdf
> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>
>
> Thanks,
> Rui Wang
>
>
>

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Rui Wang <am...@apache.org>.
A small update:

I tried to prototype how to support just "EMIT AFTER WATERMARK". Seems like
supporting such syntax in planner without alias reference is pretty
straightforward.

Regarding to engine implementation (e.g. Enumerables), while I am still
prototyping, I feel like the implementation will be the same as "EMIT
window_start >= current_watermark()":

Basically we need a way for different engines to provide their watermark,
and seems like function/operator is a handy way. In this case what is in
plans with EMIT could still be a RexNode that gives an expression between
watermark (a operator that leaves for engine to implement) and a column.
etc.



-Rui



On Wed, Jan 22, 2020 at 12:18 PM Rui Wang <am...@apache.org> wrote:

> That makes sense. I will remove the aggregation constraint (i.e. EMIT
> requires a GROUP BY). Let's say now EMIT should work on any query for
> general purpose.
>
> Because the above contains too much information, let me further summarize
> critical points here and see if we could reach a consensus:
>
> 1. Do we agree EMIT should execute after FROM, but before any other
> clauses, assuming EMIT works on any query?
> My opinion is EMIT should execute after FROM. It actually matches what
> Julian has said: "Think of it as executing the query at T, then executing
> it at T+delta". Emit just controls how large the delta is. And all other
> comparisons are just the following WHERE, GROUP BY, HAVING, ORDER BY,
> LIMIT, etc. It will also match with DB cases, where EMIT produces a single
> delta once that is from -inf to +inf on the timeline.
>
>
> 2. Can we support EMIT predicates rather than a list of fixed emit
> strategy?
> To recap, pros and cons of EMIT predicates:
> pros: 1)  extensible with several predefined functions. And if there is a
> new need, it will very likely to define a function than defining
> new keywords/syntax. 2) Easy to understand (think about it will be applied
> to tables to decide when to emit rows).
> cons: 1) Users will gain a lot of powers to write expressions.
> pros and cons of special EMIT strategy syntax:
> pros: 1) uses will not have a lot of power to write expressions as syntax
> is fixed (they can tune a few parameters though)
> cons: 1) had trouble explaining it to SQL people (it sounds like a hack).
> 2) there are 5 or more strategies known so we need a list that is longer
> than what was proposed in paper. 3) Potential backward-compatible issues in
> case emit strategies change.
>
> Lastly, for the table evolving problem that Julian mentioned (e.g. see
> (100, 6) retracted and (100, 8) added), I totally agree with it because of
> the nature of streaming: you probably never know if data is complete when a
> result emits, thus the result could be updated later.
>
>
> -Rui
>
>
> On Wed, Jan 22, 2020 at 11:29 AM Julian Hyde <jh...@gmail.com>
> wrote:
>
>> In the SIGMOD paper, EMIT could be applied to any query. Think of it as
>> executing the query at T, then executing it at T+delta, compare the
>> results, and emit any added or removed records.
>>
>> Tying it to aggregate queries seems wrong. (If we have a special
>> semantics for aggregate queries, how are we possibly going to make the
>> semantics well-defined for, say, user-defined table functions?)
>>
>> Yes, I know aggregate queries have weird behavior. If you have computed
>> ’select product, sum(amount) from orders group by product’, and a new order
>> with (product 100, amount 2), then you are going to see (100, 6) retracted
>> and (100, 8) added. But I think we have to live with that. Otherwise EMIT
>> semantics get a lot more complicated.
>>
>> Julian
>>
>>
>> > On Jan 21, 2020, at 1:24 PM, Rui Wang <am...@apache.org> wrote:
>> >
>> > I think there was a big missing in my summary about the position of EMIT
>> > and the execution order (and I forgot about ORDER BY). Try to address
>> them
>> > here:
>> >
>> > SELECT
>> >
>> > [FROM TVF windowing] // windowing happen here
>> >
>> > [WHERE clause]
>> >
>> > [GROUP BY clause]
>> >
>> > [HAVING clause]
>> >
>> > [ORDER BY clause]
>> >
>> > [LIMIT clause]
>> > [EMIT clause] // materialization latency
>> >
>> > The position of EMIT is indeed a bit confusing. As the right execution
>> > order should be: FROM -> EMIT -> others like normal query. FROM is
>> > continuously generating data and EMIT decide when to emit a part of
>> data,
>> > and then other clauses are applied to emitted data and update
>> downstream.
>> >
>> > So at least two open questions:
>> > 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
>> > like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER
>> BY)
>> > or EMIT AFTER (undefined yet if we want to support expressions, I hope
>> we
>> > do).
>> > 2. Where is the EMIT clause? Maybe the most clear position is to put it
>> > after FROM.
>> >
>> >
>> > -Rui
>> >
>> >
>> > On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <ru...@google.com> wrote:
>> >
>> >>
>> >>
>> >> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <jh...@apache.org> wrote:
>> >>
>> >>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
>> >>> and HAVING), or is it just a coincidence that you use the same word,
>> >>> HAVING?
>> >>>
>> >>
>> >> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
>> >> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a
>> GROUP BY
>> >> key, then apply EMIT HAVING expressions on sets specified by those
>> keys.
>> >> However we could loose the constraint to allow EMIT HAVING appears even
>> >> without GROUP BY, which just means that apply emit control on the whole
>> >> data set than control the emitting per group.
>> >>
>> >> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT
>> control
>> >> (emit having to decide which part of data can be emitted) ->
>> aggregation
>> >> (normal having and other aggregations). For batch/classic DB workload,
>> the
>> >> EMIT step will always emit all data. So such idea is compatible with
>> >> existing DB users.
>> >>
>> >> I happen to choose EMIT HAVING because the emit control is very
>> similar to
>> >> HAVING (and some bits of WHERE) that the only difference is: HAVING is
>> a
>> >> filter while EMIT HAVING control the emit. E.g apply HAVING
>> expressions to
>> >> data means if pass this data to downstream or not. And applying EMIT
>> >> HAVING expressions means if pass this data to downstream now or later
>> (or
>> >> discard it if the window closes).
>> >>
>> >> If you think the idea actually causes confusion rather than
>> convenience to
>> >> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
>> >> AFTER, per the original design.
>> >>
>> >> I support the idea of latency controls, but I am nervous about
>> >>> allowing full expressions in the EMIT clause if we don't have to.
>> >>>
>> >>>
>> >> Yep. It's a design choice of allowing expressions or keeping a set of
>> >> dedicated SQL syntax for some emit strategies. If don't use extensible
>> EMIT
>> >> expressions, we likely will need to provide a long list of syntax for
>> >> different emit strategies. For examples:
>> >> EMIT AFTER WATERMARK
>> >> EMIT AFTER DELAY
>> >> EMIT AFTER AFTER WATERMARK BUT LATE
>> >> EMIT AFTER COUNT
>> >> EMIT AFTER BEFORE WATERMARK
>> >> etc.
>> >>
>> >> Again it's a design choice so I am open to both ideas.
>> >>
>> >> However, personally I prefer the EMIT expressions idea because I found
>> it
>> >> was very easy to explain EMIT expressions to SQL people who don't have
>> much
>> >> streaming brackgroup. Basically you can say EMIT expressions are just
>> >> applied to rows of table from table-valued function. If there are
>> GROUP BY,
>> >> each apply expression to each group accordingly. and the result of
>> >> expression indicates if it's ready to emit. This easiness is also
>> mainly
>> >> from that we have introduced window start and window end into the
>> table,
>> >> thus we should have all data we needed in table to write expressions
>> >> against them (except for processing time triggers).
>> >>
>> >> The downside of expressions though is people will be allowed to write
>> any
>> >> expression they want, and engines will take responsibility to validate
>> >> those.
>> >>
>> >>
>> >>
>> >>> Aggregate queries have a peculiar scope. (You can only reference
>> >>> grouped expressions, for example.) I don't think we should drag that
>> >>> peculiar scope into the EMIT clause. The simplest thing is to only
>> >>> allow the EMIT clause to reference column aliases, which is similar to
>> >>> the scope used by ORDER BY.
>> >>>
>> >>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT
>> WHERE,
>> >> just like WHERE, you will not reference aggregation columns but just
>> column
>> >> aliases. It can solve the event time trigger and processing time
>> triggers.
>> >> However, it has a shortcoming of cannot express: EMIT WHEN there are
>> 100
>> >> elements accumulated, which require a COUNT(*).
>> >>
>> >>
>> >>
>> >>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
>> >>> so, which comes first?
>> >>>
>> >>>
>> >> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied
>> after
>> >> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY.
>> Note
>> >> that EMIT controls when to emit data from the streaming dataset, thus
>> all
>> >> filters and aggregations should be applied after data is read to emit.
>> Note
>> >> again to emphasize that for classic DB/batch cases, EMIT is explicitly
>> >> there which just EMIT all data once after FROM as all data is already
>> >> known.
>> >>
>> >>
>> >> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <jh...@apache.org> wrote:
>> >>
>> >>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
>> >>> current_watermark() >= T.window_end AND current_watermark() <
>> >>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
>> >>> wrapping the EMIT query as a sub-query and using ordinal SQL
>> >>> expressions on the system columns added by EMIT. (I'm not saying we
>> >>> should do this. But when designing a feature, it's worth calling out
>> >>> whether it adds power or whether it is syntactic sugar.)
>> >>
>> >>
>> >> Sounds like just do a normal WHERE based on the table from sub-query.
>> yep
>> >> that's an option. Then open question from it is if we want to mix
>> "EMIT''
>> >> semantic (which is not really a classic SQL filter) or "Filter"
>> semantic
>> >> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
>> >> leave classic DB queries unchanged (their WHERE is just a per row
>> filter
>> >> which having any new semantic, e.g. latency control, added)
>> >>
>> >>
>> >>
>> >>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org>
>> wrote:
>> >>>>
>> >>>> Hi community,
>> >>>>
>> >>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
>> >>> works as
>> >>>> a table-valued function). As it starts to work, I want to continue
>> >>>> discussing the following work of streaming sql: a way to control
>> >>>> materialization latency of a streaming query. A small note for people
>> >>> who
>> >>>> are not familiar with streaming: because steaming queries are long
>> >>> running
>> >>>> queries (maybe months or up to year), usually there is a need to see
>> >>>> results before query is terminated. Thus it will be desired to have a
>> >>> way
>> >>>> to allow users to specify, e.g. how frequently they want to see some
>> >>> result
>> >>>> from the query.
>> >>>>
>> >>>> (The following will be a summary of my proposal. I can convert it to
>> a
>> >>>> design doc if people prefer that way. Just let me know)
>> >>>>
>> >>>> *Priori work*
>> >>>> My idea is built on top of "one sql to rule them all paper" [1].
>> Kudos
>> >>> to
>> >>>> people who contributed that paper, which became the foundation of my
>> >>>> discussion.
>> >>>>
>> >>>> From [1], an EMIT clause was actually proposed to be added to the
>> end of
>> >>>> the query. Two syntax of EMIT clause was also proposed:
>> >>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
>> >>>> [STREAM] AFTER WATERMARK.
>> >>>> 2. Delay emit for a period of time after a change happens (e.g.
>> element
>> >>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
>> >>> MINUTES
>> >>>>
>> >>>> *Typical Streaming emitting latency patterns*
>> >>>> 1. Event time triggers. Emitting depends on the relationship between
>> >>>> watermark and event timestamp of events. Check this video [2] if you
>> >>> want
>> >>>> to have an introduction of watermark in the streaming world, and data
>> >>>> completeness concept based on event-timestamp.
>> >>>> 2. Processing time triggers. Emitting depends on the system clock.
>> This
>> >>> is
>> >>>> a natural idea of emitting. E.g. emit the current result every hour
>> >>> without
>> >>>> considering if data in a window is already complete.
>> >>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a
>> >>>> threshold (e.g. emit when have acculucated 1000 events)
>> >>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and
>> AND
>> >>> to
>> >>>> achieve better latency control.
>> >>>>
>> >>>> *Proposal to discuss*
>> >>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax
>> and
>> >>> two
>> >>>> aggregation functions.
>> >>>>
>> >>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after
>> normal
>> >>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
>> >>> similar
>> >>>> to HAVING. There are two differences that are worth mentioning:
>> >>>> 1. It’s not a filter. it controls when table-valued function in FROM
>> >>> clause
>> >>>> should emit a set. The set is specified by group-by keys.
>> >>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case
>> for
>> >>>> HAVING.
>> >>>>
>> >>>> *current_watermark(). *current_watermark() is an aggregation function
>> >>> that
>> >>>> returns a timestamp that provides where watermark is for a set.
>> >>>>
>> >>>> *processing_time_since_first_element().*
>> >>> processing_time_since_first_element()
>> >>>> is an aggregation function that returns the system clock time (i.e.
>> >>>> processing time) since the first element appears in the set.
>> >>>>
>> >>>> *Motivating examples*
>> >>>> The following will be a list of motivating examples of how this
>> proposal
>> >>>> will achieve different ways of latency control.
>> >>>>
>> >>>> Assume there is a steaming query that apply fixed windowing (thus
>> >>> TUMBLE)
>> >>>> as the following:
>> >>>>
>> >>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10
>> MINUTE)
>> >>> AS
>> >>>> T”. Based on this query, let’s say derived table T from orders  has
>> the
>> >>>> following schema:
>> >>>>
>> >>>>
>> >>>> order_id BIGINT
>> >>>>
>> >>>> product_id BIGINT
>> >>>>
>> >>>> order_meta STRING
>> >>>>
>> >>>> event_ts TIMESTAMP
>> >>>>
>> >>>> window_start TIMESTAMP
>> >>>>
>> >>>> window_end TIMESTAMP
>> >>>>
>> >>>>
>> >>>>
>> >>>> The following will be a table to demonstrate for different trigger
>> >>> cases,
>> >>>> how the query will be modified by adopting EMIT syntax to express the
>> >>> same
>> >>>> semantic:
>> >>>>
>> >>>> Trigger
>> >>>>
>> >>>> SQL Query
>> >>>>
>> >>>> AfterWatermark.pastEndOfWindow()
>> >>>> (emit after watermark passes end of window, thus for a window data is
>> >>>> believed complete)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> EMIT HAVING current_watermark() >= T.window_end
>> >>>>
>> >>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
>> >>>> standardMinutes(5)
>> >>>>
>> >>>> (emit after a delay of a 5 minutes when first element appear in
>> window)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5
>> MINUTE
>> >>>>
>> >>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
>> >>>> (emit after every event appears after data is really believed
>> complete
>> >>> in a
>> >>>> window)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>> >>>>
>> >>>>
>> >>>
>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
>> >>>> ().plusDuration(Duration.standardMinutes(5))
>> >>>>
>> >>>> (emit before a window is complete, by following the delay emit
>> strategy)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING current_watermark() < T.window_end AND
>> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>> >>>>
>> >>>> AfterWatermark.pastEndOfWindow()
>> >>>>
>> >>>>
>> >>>
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
>> >>>> (Duration.standardMinutes(5))
>> >>>>
>> >>>> .withLateFirings(AfterPane.elementCountAtLeast(1))
>> >>>>
>> >>>> (a combination of emitting before window closes, emit on window
>> closes
>> >>> and
>> >>>> emit after window closes)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING
>> >>>>
>> >>>> current_watermark() >= T.window_end OR
>> >>>>
>> >>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>> >>>>
>> >>>> (current_watermark() < T.window_end AND
>> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>> >>>>
>> >>>>
>> >>>> AfterWatermark.pastEndOfWindow()
>> >>>>
>> >>>> .withAllowedLateness(Duration.standardDays(2)))
>> >>>>
>> >>>> (emit after window closes plus a tolerance of 2 days late data)
>> >>>>
>> >>>> SELECT *
>> >>>>
>> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >>>>
>> >>>> GROUP BY T.window_end
>> >>>>
>> >>>> EMIT HAVING
>> >>>>
>> >>>> current_watermark() >= T.window_end AND current_watermark() <
>> >>> T.window_end
>> >>>> + INTERVAL 2 DAY
>> >>>>
>> >>>> Composite triggers
>> >>>> illustrated by examples above that AND and OR can be used to concat
>> >>>> different bool expressions.
>> >>>>
>> >>>>
>> >>>>
>> >>>> Please let me know your thoughts and any other way you prefer to
>> >>> continue
>> >>>> discussing.
>> >>>>
>> >>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>> >>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Rui Wang
>> >>>
>> >>
>>
>>

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Rui Wang <am...@apache.org>.
That makes sense. I will remove the aggregation constraint (i.e. EMIT
requires a GROUP BY). Let's say now EMIT should work on any query for
general purpose.

Because the above contains too much information, let me further summarize
critical points here and see if we could reach a consensus:

1. Do we agree EMIT should execute after FROM, but before any other
clauses, assuming EMIT works on any query?
My opinion is EMIT should execute after FROM. It actually matches what
Julian has said: "Think of it as executing the query at T, then executing
it at T+delta". Emit just controls how large the delta is. And all other
comparisons are just the following WHERE, GROUP BY, HAVING, ORDER BY,
LIMIT, etc. It will also match with DB cases, where EMIT produces a single
delta once that is from -inf to +inf on the timeline.


2. Can we support EMIT predicates rather than a list of fixed emit strategy?
To recap, pros and cons of EMIT predicates:
pros: 1)  extensible with several predefined functions. And if there is a
new need, it will very likely to define a function than defining
new keywords/syntax. 2) Easy to understand (think about it will be applied
to tables to decide when to emit rows).
cons: 1) Users will gain a lot of powers to write expressions.
pros and cons of special EMIT strategy syntax:
pros: 1) uses will not have a lot of power to write expressions as syntax
is fixed (they can tune a few parameters though)
cons: 1) had trouble explaining it to SQL people (it sounds like a hack).
2) there are 5 or more strategies known so we need a list that is longer
than what was proposed in paper. 3) Potential backward-compatible issues in
case emit strategies change.

Lastly, for the table evolving problem that Julian mentioned (e.g. see
(100, 6) retracted and (100, 8) added), I totally agree with it because of
the nature of streaming: you probably never know if data is complete when a
result emits, thus the result could be updated later.


-Rui


On Wed, Jan 22, 2020 at 11:29 AM Julian Hyde <jh...@gmail.com> wrote:

> In the SIGMOD paper, EMIT could be applied to any query. Think of it as
> executing the query at T, then executing it at T+delta, compare the
> results, and emit any added or removed records.
>
> Tying it to aggregate queries seems wrong. (If we have a special semantics
> for aggregate queries, how are we possibly going to make the semantics
> well-defined for, say, user-defined table functions?)
>
> Yes, I know aggregate queries have weird behavior. If you have computed
> ’select product, sum(amount) from orders group by product’, and a new order
> with (product 100, amount 2), then you are going to see (100, 6) retracted
> and (100, 8) added. But I think we have to live with that. Otherwise EMIT
> semantics get a lot more complicated.
>
> Julian
>
>
> > On Jan 21, 2020, at 1:24 PM, Rui Wang <am...@apache.org> wrote:
> >
> > I think there was a big missing in my summary about the position of EMIT
> > and the execution order (and I forgot about ORDER BY). Try to address
> them
> > here:
> >
> > SELECT
> >
> > [FROM TVF windowing] // windowing happen here
> >
> > [WHERE clause]
> >
> > [GROUP BY clause]
> >
> > [HAVING clause]
> >
> > [ORDER BY clause]
> >
> > [LIMIT clause]
> > [EMIT clause] // materialization latency
> >
> > The position of EMIT is indeed a bit confusing. As the right execution
> > order should be: FROM -> EMIT -> others like normal query. FROM is
> > continuously generating data and EMIT decide when to emit a part of data,
> > and then other clauses are applied to emitted data and update downstream.
> >
> > So at least two open questions:
> > 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
> > like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER
> BY)
> > or EMIT AFTER (undefined yet if we want to support expressions, I hope we
> > do).
> > 2. Where is the EMIT clause? Maybe the most clear position is to put it
> > after FROM.
> >
> >
> > -Rui
> >
> >
> > On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <ru...@google.com> wrote:
> >
> >>
> >>
> >> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <jh...@apache.org> wrote:
> >>
> >>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
> >>> and HAVING), or is it just a coincidence that you use the same word,
> >>> HAVING?
> >>>
> >>
> >> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
> >> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a
> GROUP BY
> >> key, then apply EMIT HAVING expressions on sets specified by those keys.
> >> However we could loose the constraint to allow EMIT HAVING appears even
> >> without GROUP BY, which just means that apply emit control on the whole
> >> data set than control the emitting per group.
> >>
> >> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT
> control
> >> (emit having to decide which part of data can be emitted) -> aggregation
> >> (normal having and other aggregations). For batch/classic DB workload,
> the
> >> EMIT step will always emit all data. So such idea is compatible with
> >> existing DB users.
> >>
> >> I happen to choose EMIT HAVING because the emit control is very similar
> to
> >> HAVING (and some bits of WHERE) that the only difference is: HAVING is a
> >> filter while EMIT HAVING control the emit. E.g apply HAVING expressions
> to
> >> data means if pass this data to downstream or not. And applying EMIT
> >> HAVING expressions means if pass this data to downstream now or later
> (or
> >> discard it if the window closes).
> >>
> >> If you think the idea actually causes confusion rather than convenience
> to
> >> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
> >> AFTER, per the original design.
> >>
> >> I support the idea of latency controls, but I am nervous about
> >>> allowing full expressions in the EMIT clause if we don't have to.
> >>>
> >>>
> >> Yep. It's a design choice of allowing expressions or keeping a set of
> >> dedicated SQL syntax for some emit strategies. If don't use extensible
> EMIT
> >> expressions, we likely will need to provide a long list of syntax for
> >> different emit strategies. For examples:
> >> EMIT AFTER WATERMARK
> >> EMIT AFTER DELAY
> >> EMIT AFTER AFTER WATERMARK BUT LATE
> >> EMIT AFTER COUNT
> >> EMIT AFTER BEFORE WATERMARK
> >> etc.
> >>
> >> Again it's a design choice so I am open to both ideas.
> >>
> >> However, personally I prefer the EMIT expressions idea because I found
> it
> >> was very easy to explain EMIT expressions to SQL people who don't have
> much
> >> streaming brackgroup. Basically you can say EMIT expressions are just
> >> applied to rows of table from table-valued function. If there are GROUP
> BY,
> >> each apply expression to each group accordingly. and the result of
> >> expression indicates if it's ready to emit. This easiness is also mainly
> >> from that we have introduced window start and window end into the table,
> >> thus we should have all data we needed in table to write expressions
> >> against them (except for processing time triggers).
> >>
> >> The downside of expressions though is people will be allowed to write
> any
> >> expression they want, and engines will take responsibility to validate
> >> those.
> >>
> >>
> >>
> >>> Aggregate queries have a peculiar scope. (You can only reference
> >>> grouped expressions, for example.) I don't think we should drag that
> >>> peculiar scope into the EMIT clause. The simplest thing is to only
> >>> allow the EMIT clause to reference column aliases, which is similar to
> >>> the scope used by ORDER BY.
> >>>
> >>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE,
> >> just like WHERE, you will not reference aggregation columns but just
> column
> >> aliases. It can solve the event time trigger and processing time
> triggers.
> >> However, it has a shortcoming of cannot express: EMIT WHEN there are 100
> >> elements accumulated, which require a COUNT(*).
> >>
> >>
> >>
> >>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
> >>> so, which comes first?
> >>>
> >>>
> >> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied
> after
> >> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY.
> Note
> >> that EMIT controls when to emit data from the streaming dataset, thus
> all
> >> filters and aggregations should be applied after data is read to emit.
> Note
> >> again to emphasize that for classic DB/batch cases, EMIT is explicitly
> >> there which just EMIT all data once after FROM as all data is already
> >> known.
> >>
> >>
> >> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <jh...@apache.org> wrote:
> >>
> >>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
> >>> current_watermark() >= T.window_end AND current_watermark() <
> >>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
> >>> wrapping the EMIT query as a sub-query and using ordinal SQL
> >>> expressions on the system columns added by EMIT. (I'm not saying we
> >>> should do this. But when designing a feature, it's worth calling out
> >>> whether it adds power or whether it is syntactic sugar.)
> >>
> >>
> >> Sounds like just do a normal WHERE based on the table from sub-query.
> yep
> >> that's an option. Then open question from it is if we want to mix
> "EMIT''
> >> semantic (which is not really a classic SQL filter) or "Filter" semantic
> >> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
> >> leave classic DB queries unchanged (their WHERE is just a per row filter
> >> which having any new semantic, e.g. latency control, added)
> >>
> >>
> >>
> >>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org>
> wrote:
> >>>>
> >>>> Hi community,
> >>>>
> >>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
> >>> works as
> >>>> a table-valued function). As it starts to work, I want to continue
> >>>> discussing the following work of streaming sql: a way to control
> >>>> materialization latency of a streaming query. A small note for people
> >>> who
> >>>> are not familiar with streaming: because steaming queries are long
> >>> running
> >>>> queries (maybe months or up to year), usually there is a need to see
> >>>> results before query is terminated. Thus it will be desired to have a
> >>> way
> >>>> to allow users to specify, e.g. how frequently they want to see some
> >>> result
> >>>> from the query.
> >>>>
> >>>> (The following will be a summary of my proposal. I can convert it to a
> >>>> design doc if people prefer that way. Just let me know)
> >>>>
> >>>> *Priori work*
> >>>> My idea is built on top of "one sql to rule them all paper" [1]. Kudos
> >>> to
> >>>> people who contributed that paper, which became the foundation of my
> >>>> discussion.
> >>>>
> >>>> From [1], an EMIT clause was actually proposed to be added to the end
> of
> >>>> the query. Two syntax of EMIT clause was also proposed:
> >>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
> >>>> [STREAM] AFTER WATERMARK.
> >>>> 2. Delay emit for a period of time after a change happens (e.g.
> element
> >>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
> >>> MINUTES
> >>>>
> >>>> *Typical Streaming emitting latency patterns*
> >>>> 1. Event time triggers. Emitting depends on the relationship between
> >>>> watermark and event timestamp of events. Check this video [2] if you
> >>> want
> >>>> to have an introduction of watermark in the streaming world, and data
> >>>> completeness concept based on event-timestamp.
> >>>> 2. Processing time triggers. Emitting depends on the system clock.
> This
> >>> is
> >>>> a natural idea of emitting. E.g. emit the current result every hour
> >>> without
> >>>> considering if data in a window is already complete.
> >>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a
> >>>> threshold (e.g. emit when have acculucated 1000 events)
> >>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND
> >>> to
> >>>> achieve better latency control.
> >>>>
> >>>> *Proposal to discuss*
> >>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax
> and
> >>> two
> >>>> aggregation functions.
> >>>>
> >>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after
> normal
> >>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
> >>> similar
> >>>> to HAVING. There are two differences that are worth mentioning:
> >>>> 1. It’s not a filter. it controls when table-valued function in FROM
> >>> clause
> >>>> should emit a set. The set is specified by group-by keys.
> >>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case
> for
> >>>> HAVING.
> >>>>
> >>>> *current_watermark(). *current_watermark() is an aggregation function
> >>> that
> >>>> returns a timestamp that provides where watermark is for a set.
> >>>>
> >>>> *processing_time_since_first_element().*
> >>> processing_time_since_first_element()
> >>>> is an aggregation function that returns the system clock time (i.e.
> >>>> processing time) since the first element appears in the set.
> >>>>
> >>>> *Motivating examples*
> >>>> The following will be a list of motivating examples of how this
> proposal
> >>>> will achieve different ways of latency control.
> >>>>
> >>>> Assume there is a steaming query that apply fixed windowing (thus
> >>> TUMBLE)
> >>>> as the following:
> >>>>
> >>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10
> MINUTE)
> >>> AS
> >>>> T”. Based on this query, let’s say derived table T from orders  has
> the
> >>>> following schema:
> >>>>
> >>>>
> >>>> order_id BIGINT
> >>>>
> >>>> product_id BIGINT
> >>>>
> >>>> order_meta STRING
> >>>>
> >>>> event_ts TIMESTAMP
> >>>>
> >>>> window_start TIMESTAMP
> >>>>
> >>>> window_end TIMESTAMP
> >>>>
> >>>>
> >>>>
> >>>> The following will be a table to demonstrate for different trigger
> >>> cases,
> >>>> how the query will be modified by adopting EMIT syntax to express the
> >>> same
> >>>> semantic:
> >>>>
> >>>> Trigger
> >>>>
> >>>> SQL Query
> >>>>
> >>>> AfterWatermark.pastEndOfWindow()
> >>>> (emit after watermark passes end of window, thus for a window data is
> >>>> believed complete)
> >>>>
> >>>> SELECT *
> >>>>
> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >>>>
> >>>> EMIT HAVING current_watermark() >= T.window_end
> >>>>
> >>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
> >>>> standardMinutes(5)
> >>>>
> >>>> (emit after a delay of a 5 minutes when first element appear in
> window)
> >>>>
> >>>> SELECT *
> >>>>
> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >>>>
> >>>> GROUP BY T.window_end
> >>>>
> >>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
> >>>>
> >>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
> >>>> (emit after every event appears after data is really believed complete
> >>> in a
> >>>> window)
> >>>>
> >>>> SELECT *
> >>>>
> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >>>>
> >>>> GROUP BY T.window_end
> >>>>
> >>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
> >>>>
> >>>>
> >>>
> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
> >>>> ().plusDuration(Duration.standardMinutes(5))
> >>>>
> >>>> (emit before a window is complete, by following the delay emit
> strategy)
> >>>>
> >>>> SELECT *
> >>>>
> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >>>>
> >>>> GROUP BY T.window_end
> >>>>
> >>>> EMIT HAVING current_watermark() < T.window_end AND
> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE
> >>>>
> >>>> AfterWatermark.pastEndOfWindow()
> >>>>
> >>>>
> >>>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
> >>>> (Duration.standardMinutes(5))
> >>>>
> >>>> .withLateFirings(AfterPane.elementCountAtLeast(1))
> >>>>
> >>>> (a combination of emitting before window closes, emit on window closes
> >>> and
> >>>> emit after window closes)
> >>>>
> >>>> SELECT *
> >>>>
> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >>>>
> >>>> GROUP BY T.window_end
> >>>>
> >>>> EMIT HAVING
> >>>>
> >>>> current_watermark() >= T.window_end OR
> >>>>
> >>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
> >>>>
> >>>> (current_watermark() < T.window_end AND
> >>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
> >>>>
> >>>>
> >>>> AfterWatermark.pastEndOfWindow()
> >>>>
> >>>> .withAllowedLateness(Duration.standardDays(2)))
> >>>>
> >>>> (emit after window closes plus a tolerance of 2 days late data)
> >>>>
> >>>> SELECT *
> >>>>
> >>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >>>>
> >>>> GROUP BY T.window_end
> >>>>
> >>>> EMIT HAVING
> >>>>
> >>>> current_watermark() >= T.window_end AND current_watermark() <
> >>> T.window_end
> >>>> + INTERVAL 2 DAY
> >>>>
> >>>> Composite triggers
> >>>> illustrated by examples above that AND and OR can be used to concat
> >>>> different bool expressions.
> >>>>
> >>>>
> >>>>
> >>>> Please let me know your thoughts and any other way you prefer to
> >>> continue
> >>>> discussing.
> >>>>
> >>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
> >>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
> >>>>
> >>>>
> >>>> Thanks,
> >>>> Rui Wang
> >>>
> >>
>
>

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Julian Hyde <jh...@gmail.com>.
In the SIGMOD paper, EMIT could be applied to any query. Think of it as executing the query at T, then executing it at T+delta, compare the results, and emit any added or removed records.

Tying it to aggregate queries seems wrong. (If we have a special semantics for aggregate queries, how are we possibly going to make the semantics well-defined for, say, user-defined table functions?)

Yes, I know aggregate queries have weird behavior. If you have computed ’select product, sum(amount) from orders group by product’, and a new order with (product 100, amount 2), then you are going to see (100, 6) retracted and (100, 8) added. But I think we have to live with that. Otherwise EMIT semantics get a lot more complicated.

Julian


> On Jan 21, 2020, at 1:24 PM, Rui Wang <am...@apache.org> wrote:
> 
> I think there was a big missing in my summary about the position of EMIT
> and the execution order (and I forgot about ORDER BY). Try to address them
> here:
> 
> SELECT
> 
> [FROM TVF windowing] // windowing happen here
> 
> [WHERE clause]
> 
> [GROUP BY clause]
> 
> [HAVING clause]
> 
> [ORDER BY clause]
> 
> [LIMIT clause]
> [EMIT clause] // materialization latency
> 
> The position of EMIT is indeed a bit confusing. As the right execution
> order should be: FROM -> EMIT -> others like normal query. FROM is
> continuously generating data and EMIT decide when to emit a part of data,
> and then other clauses are applied to emitted data and update downstream.
> 
> So at least two open questions:
> 1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
> like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER BY)
> or EMIT AFTER (undefined yet if we want to support expressions, I hope we
> do).
> 2. Where is the EMIT clause? Maybe the most clear position is to put it
> after FROM.
> 
> 
> -Rui
> 
> 
> On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <ru...@google.com> wrote:
> 
>> 
>> 
>> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <jh...@apache.org> wrote:
>> 
>>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
>>> and HAVING), or is it just a coincidence that you use the same word,
>>> HAVING?
>>> 
>> 
>> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
>> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY
>> key, then apply EMIT HAVING expressions on sets specified by those keys.
>> However we could loose the constraint to allow EMIT HAVING appears even
>> without GROUP BY, which just means that apply emit control on the whole
>> data set than control the emitting per group.
>> 
>> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control
>> (emit having to decide which part of data can be emitted) -> aggregation
>> (normal having and other aggregations). For batch/classic DB workload, the
>> EMIT step will always emit all data. So such idea is compatible with
>> existing DB users.
>> 
>> I happen to choose EMIT HAVING because the emit control is very similar to
>> HAVING (and some bits of WHERE) that the only difference is: HAVING is a
>> filter while EMIT HAVING control the emit. E.g apply HAVING expressions to
>> data means if pass this data to downstream or not. And applying EMIT
>> HAVING expressions means if pass this data to downstream now or later (or
>> discard it if the window closes).
>> 
>> If you think the idea actually causes confusion rather than convenience to
>> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
>> AFTER, per the original design.
>> 
>> I support the idea of latency controls, but I am nervous about
>>> allowing full expressions in the EMIT clause if we don't have to.
>>> 
>>> 
>> Yep. It's a design choice of allowing expressions or keeping a set of
>> dedicated SQL syntax for some emit strategies. If don't use extensible EMIT
>> expressions, we likely will need to provide a long list of syntax for
>> different emit strategies. For examples:
>> EMIT AFTER WATERMARK
>> EMIT AFTER DELAY
>> EMIT AFTER AFTER WATERMARK BUT LATE
>> EMIT AFTER COUNT
>> EMIT AFTER BEFORE WATERMARK
>> etc.
>> 
>> Again it's a design choice so I am open to both ideas.
>> 
>> However, personally I prefer the EMIT expressions idea because I found it
>> was very easy to explain EMIT expressions to SQL people who don't have much
>> streaming brackgroup. Basically you can say EMIT expressions are just
>> applied to rows of table from table-valued function. If there are GROUP BY,
>> each apply expression to each group accordingly. and the result of
>> expression indicates if it's ready to emit. This easiness is also mainly
>> from that we have introduced window start and window end into the table,
>> thus we should have all data we needed in table to write expressions
>> against them (except for processing time triggers).
>> 
>> The downside of expressions though is people will be allowed to write any
>> expression they want, and engines will take responsibility to validate
>> those.
>> 
>> 
>> 
>>> Aggregate queries have a peculiar scope. (You can only reference
>>> grouped expressions, for example.) I don't think we should drag that
>>> peculiar scope into the EMIT clause. The simplest thing is to only
>>> allow the EMIT clause to reference column aliases, which is similar to
>>> the scope used by ORDER BY.
>>> 
>>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE,
>> just like WHERE, you will not reference aggregation columns but just column
>> aliases. It can solve the event time trigger and processing time triggers.
>> However, it has a shortcoming of cannot express: EMIT WHEN there are 100
>> elements accumulated, which require a COUNT(*).
>> 
>> 
>> 
>>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
>>> so, which comes first?
>>> 
>>> 
>> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after
>> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note
>> that EMIT controls when to emit data from the streaming dataset, thus all
>> filters and aggregations should be applied after data is read to emit. Note
>> again to emphasize that for classic DB/batch cases, EMIT is explicitly
>> there which just EMIT all data once after FROM as all data is already
>> known.
>> 
>> 
>> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <jh...@apache.org> wrote:
>> 
>>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
>>> current_watermark() >= T.window_end AND current_watermark() <
>>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
>>> wrapping the EMIT query as a sub-query and using ordinal SQL
>>> expressions on the system columns added by EMIT. (I'm not saying we
>>> should do this. But when designing a feature, it's worth calling out
>>> whether it adds power or whether it is syntactic sugar.)
>> 
>> 
>> Sounds like just do a normal WHERE based on the table from sub-query. yep
>> that's an option. Then open question from it is if we want to mix "EMIT''
>> semantic (which is not really a classic SQL filter) or "Filter" semantic
>> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
>> leave classic DB queries unchanged (their WHERE is just a per row filter
>> which having any new semantic, e.g. latency control, added)
>> 
>> 
>> 
>>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org> wrote:
>>>> 
>>>> Hi community,
>>>> 
>>>> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
>>> works as
>>>> a table-valued function). As it starts to work, I want to continue
>>>> discussing the following work of streaming sql: a way to control
>>>> materialization latency of a streaming query. A small note for people
>>> who
>>>> are not familiar with streaming: because steaming queries are long
>>> running
>>>> queries (maybe months or up to year), usually there is a need to see
>>>> results before query is terminated. Thus it will be desired to have a
>>> way
>>>> to allow users to specify, e.g. how frequently they want to see some
>>> result
>>>> from the query.
>>>> 
>>>> (The following will be a summary of my proposal. I can convert it to a
>>>> design doc if people prefer that way. Just let me know)
>>>> 
>>>> *Priori work*
>>>> My idea is built on top of "one sql to rule them all paper" [1]. Kudos
>>> to
>>>> people who contributed that paper, which became the foundation of my
>>>> discussion.
>>>> 
>>>> From [1], an EMIT clause was actually proposed to be added to the end of
>>>> the query. Two syntax of EMIT clause was also proposed:
>>>> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
>>>> [STREAM] AFTER WATERMARK.
>>>> 2. Delay emit for a period of time after a change happens (e.g. element
>>>> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
>>> MINUTES
>>>> 
>>>> *Typical Streaming emitting latency patterns*
>>>> 1. Event time triggers. Emitting depends on the relationship between
>>>> watermark and event timestamp of events. Check this video [2] if you
>>> want
>>>> to have an introduction of watermark in the streaming world, and data
>>>> completeness concept based on event-timestamp.
>>>> 2. Processing time triggers. Emitting depends on the system clock. This
>>> is
>>>> a natural idea of emitting. E.g. emit the current result every hour
>>> without
>>>> considering if data in a window is already complete.
>>>> 3. data-driven triggers. E.g. emit when accumulated events exceed a
>>>> threshold (e.g. emit when have acculucated 1000 events)
>>>> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND
>>> to
>>>> achieve better latency control.
>>>> 
>>>> *Proposal to discuss*
>>>> I want to extend the proposal in [1] and propose EMIT HAVING syntax and
>>> two
>>>> aggregation functions.
>>>> 
>>>> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
>>>> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
>>> similar
>>>> to HAVING. There are two differences that are worth mentioning:
>>>> 1. It’s not a filter. it controls when table-valued function in FROM
>>> clause
>>>> should emit a set. The set is specified by group-by keys.
>>>> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
>>>> HAVING.
>>>> 
>>>> *current_watermark(). *current_watermark() is an aggregation function
>>> that
>>>> returns a timestamp that provides where watermark is for a set.
>>>> 
>>>> *processing_time_since_first_element().*
>>> processing_time_since_first_element()
>>>> is an aggregation function that returns the system clock time (i.e.
>>>> processing time) since the first element appears in the set.
>>>> 
>>>> *Motivating examples*
>>>> The following will be a list of motivating examples of how this proposal
>>>> will achieve different ways of latency control.
>>>> 
>>>> Assume there is a steaming query that apply fixed windowing (thus
>>> TUMBLE)
>>>> as the following:
>>>> 
>>>> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE)
>>> AS
>>>> T”. Based on this query, let’s say derived table T from orders  has the
>>>> following schema:
>>>> 
>>>> 
>>>> order_id BIGINT
>>>> 
>>>> product_id BIGINT
>>>> 
>>>> order_meta STRING
>>>> 
>>>> event_ts TIMESTAMP
>>>> 
>>>> window_start TIMESTAMP
>>>> 
>>>> window_end TIMESTAMP
>>>> 
>>>> 
>>>> 
>>>> The following will be a table to demonstrate for different trigger
>>> cases,
>>>> how the query will be modified by adopting EMIT syntax to express the
>>> same
>>>> semantic:
>>>> 
>>>> Trigger
>>>> 
>>>> SQL Query
>>>> 
>>>> AfterWatermark.pastEndOfWindow()
>>>> (emit after watermark passes end of window, thus for a window data is
>>>> believed complete)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> EMIT HAVING current_watermark() >= T.window_end
>>>> 
>>>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
>>>> standardMinutes(5)
>>>> 
>>>> (emit after a delay of a 5 minutes when first element appear in window)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>>>> 
>>>> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
>>>> (emit after every event appears after data is really believed complete
>>> in a
>>>> window)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>>>> 
>>>> 
>>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
>>>> ().plusDuration(Duration.standardMinutes(5))
>>>> 
>>>> (emit before a window is complete, by following the delay emit strategy)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING current_watermark() < T.window_end AND
>>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>>>> 
>>>> AfterWatermark.pastEndOfWindow()
>>>> 
>>>> 
>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
>>>> (Duration.standardMinutes(5))
>>>> 
>>>> .withLateFirings(AfterPane.elementCountAtLeast(1))
>>>> 
>>>> (a combination of emitting before window closes, emit on window closes
>>> and
>>>> emit after window closes)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING
>>>> 
>>>> current_watermark() >= T.window_end OR
>>>> 
>>>> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>>>> 
>>>> (current_watermark() < T.window_end AND
>>>> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>>>> 
>>>> 
>>>> AfterWatermark.pastEndOfWindow()
>>>> 
>>>> .withAllowedLateness(Duration.standardDays(2)))
>>>> 
>>>> (emit after window closes plus a tolerance of 2 days late data)
>>>> 
>>>> SELECT *
>>>> 
>>>> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>>>> 
>>>> GROUP BY T.window_end
>>>> 
>>>> EMIT HAVING
>>>> 
>>>> current_watermark() >= T.window_end AND current_watermark() <
>>> T.window_end
>>>> + INTERVAL 2 DAY
>>>> 
>>>> Composite triggers
>>>> illustrated by examples above that AND and OR can be used to concat
>>>> different bool expressions.
>>>> 
>>>> 
>>>> 
>>>> Please let me know your thoughts and any other way you prefer to
>>> continue
>>>> discussing.
>>>> 
>>>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>>> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>> 
>>>> 
>>>> Thanks,
>>>> Rui Wang
>>> 
>> 


Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Rui Wang <am...@apache.org>.
I think there was a big missing in my summary about the position of EMIT
and the execution order (and I forgot about ORDER BY). Try to address them
here:

SELECT

[FROM TVF windowing] // windowing happen here

[WHERE clause]

[GROUP BY clause]

[HAVING clause]

[ORDER BY clause]

[LIMIT clause]
[EMIT clause] // materialization latency

The position of EMIT is indeed a bit confusing. As the right execution
order should be: FROM -> EMIT -> others like normal query. FROM is
continuously generating data and EMIT decide when to emit a part of data,
and then other clauses are applied to emitted data and update downstream.

So at least two open questions:
1. What should we use for EMIT? EMIT HAVING (can use aggregation columns
like COUNT(*)), EMIT WHERE (can only use single column alias like ORDER BY)
or EMIT AFTER (undefined yet if we want to support expressions, I hope we
do).
2. Where is the EMIT clause? Maybe the most clear position is to put it
after FROM.


-Rui


On Tue, Jan 21, 2020 at 1:09 PM Rui Wang <ru...@google.com> wrote:

>
>
> On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <jh...@apache.org> wrote:
>
>> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
>> and HAVING), or is it just a coincidence that you use the same word,
>> HAVING?
>>
>
> EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
> relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY
> key, then apply EMIT HAVING expressions on sets specified by those keys.
> However we could loose the constraint to allow EMIT HAVING appears even
> without GROUP BY, which just means that apply emit control on the whole
> data set than control the emitting per group.
>
> In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control
> (emit having to decide which part of data can be emitted) -> aggregation
> (normal having and other aggregations). For batch/classic DB workload, the
> EMIT step will always emit all data. So such idea is compatible with
> existing DB users.
>
> I happen to choose EMIT HAVING because the emit control is very similar to
> HAVING (and some bits of WHERE) that the only difference is: HAVING is a
> filter while EMIT HAVING control the emit. E.g apply HAVING expressions to
> data means if pass this data to downstream or not. And applying EMIT
> HAVING expressions means if pass this data to downstream now or later (or
> discard it if the window closes).
>
> If you think the idea actually causes confusion rather than convenience to
> onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
> AFTER, per the original design.
>
> I support the idea of latency controls, but I am nervous about
>> allowing full expressions in the EMIT clause if we don't have to.
>>
>>
> Yep. It's a design choice of allowing expressions or keeping a set of
> dedicated SQL syntax for some emit strategies. If don't use extensible EMIT
> expressions, we likely will need to provide a long list of syntax for
> different emit strategies. For examples:
> EMIT AFTER WATERMARK
> EMIT AFTER DELAY
> EMIT AFTER AFTER WATERMARK BUT LATE
> EMIT AFTER COUNT
> EMIT AFTER BEFORE WATERMARK
> etc.
>
> Again it's a design choice so I am open to both ideas.
>
> However, personally I prefer the EMIT expressions idea because I found it
> was very easy to explain EMIT expressions to SQL people who don't have much
> streaming brackgroup. Basically you can say EMIT expressions are just
> applied to rows of table from table-valued function. If there are GROUP BY,
> each apply expression to each group accordingly. and the result of
> expression indicates if it's ready to emit. This easiness is also mainly
> from that we have introduced window start and window end into the table,
> thus we should have all data we needed in table to write expressions
> against them (except for processing time triggers).
>
> The downside of expressions though is people will be allowed to write any
> expression they want, and engines will take responsibility to validate
> those.
>
>
>
>> Aggregate queries have a peculiar scope. (You can only reference
>> grouped expressions, for example.) I don't think we should drag that
>> peculiar scope into the EMIT clause. The simplest thing is to only
>> allow the EMIT clause to reference column aliases, which is similar to
>> the scope used by ORDER BY.
>>
>> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE,
> just like WHERE, you will not reference aggregation columns but just column
> aliases. It can solve the event time trigger and processing time triggers.
> However, it has a shortcoming of cannot express: EMIT WHEN there are 100
> elements accumulated, which require a COUNT(*).
>
>
>
>> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
>> so, which comes first?
>>
>>
> Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after
> FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note
> that EMIT controls when to emit data from the streaming dataset, thus all
> filters and aggregations should be applied after data is read to emit. Note
> again to emphasize that for classic DB/batch cases, EMIT is explicitly
> there which just EMIT all data once after FROM as all data is already
> known.
>
>
> On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <jh...@apache.org> wrote:
>
>> One more thought. Since EMIT produces a relation, your "EMIT HAVING
>> current_watermark() >= T.window_end AND current_watermark() <
>> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
>> wrapping the EMIT query as a sub-query and using ordinal SQL
>> expressions on the system columns added by EMIT. (I'm not saying we
>> should do this. But when designing a feature, it's worth calling out
>> whether it adds power or whether it is syntactic sugar.)
>
>
> Sounds like just do a normal WHERE based on the table from sub-query. yep
> that's an option. Then open question from it is if we want to mix "EMIT''
> semantic (which is not really a classic SQL filter) or "Filter" semantic
> (WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
> leave classic DB queries unchanged (their WHERE is just a per row filter
> which having any new semantic, e.g. latency control, added)
>
>
>
>> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org> wrote:
>> >
>> > Hi community,
>> >
>> > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now
>> works as
>> > a table-valued function). As it starts to work, I want to continue
>> > discussing the following work of streaming sql: a way to control
>> > materialization latency of a streaming query. A small note for people
>> who
>> > are not familiar with streaming: because steaming queries are long
>> running
>> > queries (maybe months or up to year), usually there is a need to see
>> > results before query is terminated. Thus it will be desired to have a
>> way
>> > to allow users to specify, e.g. how frequently they want to see some
>> result
>> > from the query.
>> >
>> > (The following will be a summary of my proposal. I can convert it to a
>> > design doc if people prefer that way. Just let me know)
>> >
>> > *Priori work*
>> > My idea is built on top of "one sql to rule them all paper" [1]. Kudos
>> to
>> > people who contributed that paper, which became the foundation of my
>> > discussion.
>> >
>> > From [1], an EMIT clause was actually proposed to be added to the end of
>> > the query. Two syntax of EMIT clause was also proposed:
>> > 1. EMIT after the watermark passes the end of the window. E.g. EMIT
>> > [STREAM] AFTER WATERMARK.
>> > 2. Delay emit for a period of time after a change happens (e.g. element
>> > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6'
>> MINUTES
>> >
>> > *Typical Streaming emitting latency patterns*
>> > 1. Event time triggers. Emitting depends on the relationship between
>> > watermark and event timestamp of events. Check this video [2] if you
>> want
>> > to have an introduction of watermark in the streaming world, and data
>> > completeness concept based on event-timestamp.
>> > 2. Processing time triggers. Emitting depends on the system clock. This
>> is
>> > a natural idea of emitting. E.g. emit the current result every hour
>> without
>> > considering if data in a window is already complete.
>> > 3. data-driven triggers. E.g. emit when accumulated events exceed a
>> > threshold (e.g. emit when have acculucated 1000 events)
>> > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND
>> to
>> > achieve better latency control.
>> >
>> > *Proposal to discuss*
>> > I want to extend the proposal in [1] and propose EMIT HAVING syntax and
>> two
>> > aggregation functions.
>> >
>> > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
>> > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
>> similar
>> > to HAVING. There are two differences that are worth mentioning:
>> > 1. It’s not a filter. it controls when table-valued function in FROM
>> clause
>> > should emit a set. The set is specified by group-by keys.
>> > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
>> > HAVING.
>> >
>> > *current_watermark(). *current_watermark() is an aggregation function
>> that
>> > returns a timestamp that provides where watermark is for a set.
>> >
>> > *processing_time_since_first_element().*
>> processing_time_since_first_element()
>> > is an aggregation function that returns the system clock time (i.e.
>> > processing time) since the first element appears in the set.
>> >
>> > *Motivating examples*
>> > The following will be a list of motivating examples of how this proposal
>> > will achieve different ways of latency control.
>> >
>> > Assume there is a steaming query that apply fixed windowing (thus
>> TUMBLE)
>> > as the following:
>> >
>> > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE)
>> AS
>> > T”. Based on this query, let’s say derived table T from orders  has the
>> > following schema:
>> >
>> >
>> > order_id BIGINT
>> >
>> > product_id BIGINT
>> >
>> > order_meta STRING
>> >
>> > event_ts TIMESTAMP
>> >
>> > window_start TIMESTAMP
>> >
>> > window_end TIMESTAMP
>> >
>> >
>> >
>> > The following will be a table to demonstrate for different trigger
>> cases,
>> > how the query will be modified by adopting EMIT syntax to express the
>> same
>> > semantic:
>> >
>> > Trigger
>> >
>> > SQL Query
>> >
>> > AfterWatermark.pastEndOfWindow()
>> > (emit after watermark passes end of window, thus for a window data is
>> > believed complete)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > EMIT HAVING current_watermark() >= T.window_end
>> >
>> > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
>> > standardMinutes(5)
>> >
>> > (emit after a delay of a 5 minutes when first element appear in window)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>> >
>> > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
>> > (emit after every event appears after data is really believed complete
>> in a
>> > window)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>> >
>> >
>> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
>> > ().plusDuration(Duration.standardMinutes(5))
>> >
>> > (emit before a window is complete, by following the delay emit strategy)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING current_watermark() < T.window_end AND
>> > processing_time_since_first_element() >= INTERVAL 5 MINUTE
>> >
>> > AfterWatermark.pastEndOfWindow()
>> >
>> >
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
>> > (Duration.standardMinutes(5))
>> >
>> > .withLateFirings(AfterPane.elementCountAtLeast(1))
>> >
>> > (a combination of emitting before window closes, emit on window closes
>> and
>> > emit after window closes)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING
>> >
>> > current_watermark() >= T.window_end OR
>> >
>> > (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>> >
>> > (current_watermark() < T.window_end AND
>> > processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>> >
>> >
>> > AfterWatermark.pastEndOfWindow()
>> >
>> > .withAllowedLateness(Duration.standardDays(2)))
>> >
>> > (emit after window closes plus a tolerance of 2 days late data)
>> >
>> > SELECT *
>> >
>> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>> >
>> > GROUP BY T.window_end
>> >
>> > EMIT HAVING
>> >
>> > current_watermark() >= T.window_end AND current_watermark() <
>> T.window_end
>> > + INTERVAL 2 DAY
>> >
>> > Composite triggers
>> > illustrated by examples above that AND and OR can be used to concat
>> > different bool expressions.
>> >
>> >
>> >
>> > Please let me know your thoughts and any other way you prefer to
>> continue
>> > discussing.
>> >
>> > [1]: https://arxiv.org/pdf/1905.12133.pdf
>> > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> >
>> >
>> > Thanks,
>> > Rui Wang
>>
>

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Rui Wang <ru...@google.com.INVALID>.
On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <jh...@apache.org> wrote:

> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
> and HAVING), or is it just a coincidence that you use the same word,
> HAVING?
>

EMIT HAVING is independent from HAVING, but EMIT HAVING does have a
relationship to GROUP BY: EMIT HAVING requires a GROUP BY. It is a GROUP BY
key, then apply EMIT HAVING expressions on sets specified by those keys.
However we could loose the constraint to allow EMIT HAVING appears even
without GROUP BY, which just means that apply emit control on the whole
data set than control the emitting per group.

In my opinion, the execution order is: grouping (GROUP BY) -> EMIT control
(emit having to decide which part of data can be emitted) -> aggregation
(normal having and other aggregations). For batch/classic DB workload, the
EMIT step will always emit all data. So such idea is compatible with
existing DB users.

I happen to choose EMIT HAVING because the emit control is very similar to
HAVING (and some bits of WHERE) that the only difference is: HAVING is a
filter while EMIT HAVING control the emit. E.g apply HAVING expressions to
data means if pass this data to downstream or not. And applying EMIT
HAVING expressions means if pass this data to downstream now or later (or
discard it if the window closes).

If you think the idea actually causes confusion rather than convenience to
onboard people to use steaming sql, we can replace EMIT HAVING by EMIT
AFTER, per the original design.

I support the idea of latency controls, but I am nervous about
> allowing full expressions in the EMIT clause if we don't have to.
>
>
Yep. It's a design choice of allowing expressions or keeping a set of
dedicated SQL syntax for some emit strategies. If don't use extensible EMIT
expressions, we likely will need to provide a long list of syntax for
different emit strategies. For examples:
EMIT AFTER WATERMARK
EMIT AFTER DELAY
EMIT AFTER AFTER WATERMARK BUT LATE
EMIT AFTER COUNT
EMIT AFTER BEFORE WATERMARK
etc.

Again it's a design choice so I am open to both ideas.

However, personally I prefer the EMIT expressions idea because I found it
was very easy to explain EMIT expressions to SQL people who don't have much
streaming brackgroup. Basically you can say EMIT expressions are just
applied to rows of table from table-valued function. If there are GROUP BY,
each apply expression to each group accordingly. and the result of
expression indicates if it's ready to emit. This easiness is also mainly
from that we have introduced window start and window end into the table,
thus we should have all data we needed in table to write expressions
against them (except for processing time triggers).

The downside of expressions though is people will be allowed to write any
expression they want, and engines will take responsibility to validate
those.



> Aggregate queries have a peculiar scope. (You can only reference
> grouped expressions, for example.) I don't think we should drag that
> peculiar scope into the EMIT clause. The simplest thing is to only
> allow the EMIT clause to reference column aliases, which is similar to
> the scope used by ORDER BY.
>
> That sounds like one of the prototype ideas: EMIT WHERE. In EMIT WHERE,
just like WHERE, you will not reference aggregation columns but just column
aliases. It can solve the event time trigger and processing time triggers.
However, it has a shortcoming of cannot express: EMIT WHEN there are 100
elements accumulated, which require a COUNT(*).



> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
> so, which comes first?
>
>
Sorry I forgot about ORDER BY. In my opinion EMIT should be applied after
FROM, but before all other WHERE, HAVING (or aggregation), ORDER BY. Note
that EMIT controls when to emit data from the streaming dataset, thus all
filters and aggregations should be applied after data is read to emit. Note
again to emphasize that for classic DB/batch cases, EMIT is explicitly
there which just EMIT all data once after FROM as all data is already
known.


On Tue, Jan 21, 2020 at 12:41 PM Julian Hyde <jh...@apache.org> wrote:

> One more thought. Since EMIT produces a relation, your "EMIT HAVING
> current_watermark() >= T.window_end AND current_watermark() <
> T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
> wrapping the EMIT query as a sub-query and using ordinal SQL
> expressions on the system columns added by EMIT. (I'm not saying we
> should do this. But when designing a feature, it's worth calling out
> whether it adds power or whether it is syntactic sugar.)


Sounds like just do a normal WHERE based on the table from sub-query. yep
that's an option. Then open question from it is if we want to mix "EMIT''
semantic (which is not really a classic SQL filter) or "Filter" semantic
(WHERE and HAVING) into existing WHERE and HAVING. I prefer not to, to
leave classic DB queries unchanged (their WHERE is just a per row filter
which having any new semantic, e.g. latency control, added)



> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org> wrote:
> >
> > Hi community,
> >
> > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works
> as
> > a table-valued function). As it starts to work, I want to continue
> > discussing the following work of streaming sql: a way to control
> > materialization latency of a streaming query. A small note for people who
> > are not familiar with streaming: because steaming queries are long
> running
> > queries (maybe months or up to year), usually there is a need to see
> > results before query is terminated. Thus it will be desired to have a way
> > to allow users to specify, e.g. how frequently they want to see some
> result
> > from the query.
> >
> > (The following will be a summary of my proposal. I can convert it to a
> > design doc if people prefer that way. Just let me know)
> >
> > *Priori work*
> > My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
> > people who contributed that paper, which became the foundation of my
> > discussion.
> >
> > From [1], an EMIT clause was actually proposed to be added to the end of
> > the query. Two syntax of EMIT clause was also proposed:
> > 1. EMIT after the watermark passes the end of the window. E.g. EMIT
> > [STREAM] AFTER WATERMARK.
> > 2. Delay emit for a period of time after a change happens (e.g. element
> > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES
> >
> > *Typical Streaming emitting latency patterns*
> > 1. Event time triggers. Emitting depends on the relationship between
> > watermark and event timestamp of events. Check this video [2] if you want
> > to have an introduction of watermark in the streaming world, and data
> > completeness concept based on event-timestamp.
> > 2. Processing time triggers. Emitting depends on the system clock. This
> is
> > a natural idea of emitting. E.g. emit the current result every hour
> without
> > considering if data in a window is already complete.
> > 3. data-driven triggers. E.g. emit when accumulated events exceed a
> > threshold (e.g. emit when have acculucated 1000 events)
> > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
> > achieve better latency control.
> >
> > *Proposal to discuss*
> > I want to extend the proposal in [1] and propose EMIT HAVING syntax and
> two
> > aggregation functions.
> >
> > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
> > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves
> similar
> > to HAVING. There are two differences that are worth mentioning:
> > 1. It’s not a filter. it controls when table-valued function in FROM
> clause
> > should emit a set. The set is specified by group-by keys.
> > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
> > HAVING.
> >
> > *current_watermark(). *current_watermark() is an aggregation function
> that
> > returns a timestamp that provides where watermark is for a set.
> >
> > *processing_time_since_first_element().*
> processing_time_since_first_element()
> > is an aggregation function that returns the system clock time (i.e.
> > processing time) since the first element appears in the set.
> >
> > *Motivating examples*
> > The following will be a list of motivating examples of how this proposal
> > will achieve different ways of latency control.
> >
> > Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
> > as the following:
> >
> > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE)
> AS
> > T”. Based on this query, let’s say derived table T from orders  has the
> > following schema:
> >
> >
> > order_id BIGINT
> >
> > product_id BIGINT
> >
> > order_meta STRING
> >
> > event_ts TIMESTAMP
> >
> > window_start TIMESTAMP
> >
> > window_end TIMESTAMP
> >
> >
> >
> > The following will be a table to demonstrate for different trigger cases,
> > how the query will be modified by adopting EMIT syntax to express the
> same
> > semantic:
> >
> > Trigger
> >
> > SQL Query
> >
> > AfterWatermark.pastEndOfWindow()
> > (emit after watermark passes end of window, thus for a window data is
> > believed complete)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > EMIT HAVING current_watermark() >= T.window_end
> >
> > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
> > standardMinutes(5)
> >
> > (emit after a delay of a 5 minutes when first element appear in window)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
> >
> > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
> > (emit after every event appears after data is really believed complete
> in a
> > window)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
> >
> >
> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
> > ().plusDuration(Duration.standardMinutes(5))
> >
> > (emit before a window is complete, by following the delay emit strategy)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING current_watermark() < T.window_end AND
> > processing_time_since_first_element() >= INTERVAL 5 MINUTE
> >
> > AfterWatermark.pastEndOfWindow()
> >
> >
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
> > (Duration.standardMinutes(5))
> >
> > .withLateFirings(AfterPane.elementCountAtLeast(1))
> >
> > (a combination of emitting before window closes, emit on window closes
> and
> > emit after window closes)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING
> >
> > current_watermark() >= T.window_end OR
> >
> > (current_watermark() > T.window_end AND COUNT(*) > 1) OR
> >
> > (current_watermark() < T.window_end AND
> > processing_time_since_first_element() >= INTERVAL 5 MINUTE)
> >
> >
> > AfterWatermark.pastEndOfWindow()
> >
> > .withAllowedLateness(Duration.standardDays(2)))
> >
> > (emit after window closes plus a tolerance of 2 days late data)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING
> >
> > current_watermark() >= T.window_end AND current_watermark() <
> T.window_end
> > + INTERVAL 2 DAY
> >
> > Composite triggers
> > illustrated by examples above that AND and OR can be used to concat
> > different bool expressions.
> >
> >
> >
> > Please let me know your thoughts and any other way you prefer to continue
> > discussing.
> >
> > [1]: https://arxiv.org/pdf/1905.12133.pdf
> > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
> >
> >
> > Thanks,
> > Rui Wang
>

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Julian Hyde <jh...@apache.org>.
One more thought. Since EMIT produces a relation, your "EMIT HAVING
current_watermark() >= T.window_end AND current_watermark() <
T.window_end + INTERVAL 2 DAY" could perhaps be accomplished by
wrapping the EMIT query as a sub-query and using ordinal SQL
expressions on the system columns added by EMIT. (I'm not saying we
should do this. But when designing a feature, it's worth calling out
whether it adds power or whether it is syntactic sugar.)

On Tue, Jan 21, 2020 at 12:34 PM Julian Hyde <jh...@apache.org> wrote:
>
> Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
> and HAVING), or is it just a coincidence that you use the same word,
> HAVING?
>
> I support the idea of latency controls, but I am nervous about
> allowing full expressions in the EMIT clause if we don't have to.
>
> Aggregate queries have a peculiar scope. (You can only reference
> grouped expressions, for example.) I don't think we should drag that
> peculiar scope into the EMIT clause. The simplest thing is to only
> allow the EMIT clause to reference column aliases, which is similar to
> the scope used by ORDER BY.
>
> Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
> so, which comes first?
>
> On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org> wrote:
> >
> > Hi community,
> >
> > First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as
> > a table-valued function). As it starts to work, I want to continue
> > discussing the following work of streaming sql: a way to control
> > materialization latency of a streaming query. A small note for people who
> > are not familiar with streaming: because steaming queries are long running
> > queries (maybe months or up to year), usually there is a need to see
> > results before query is terminated. Thus it will be desired to have a way
> > to allow users to specify, e.g. how frequently they want to see some result
> > from the query.
> >
> > (The following will be a summary of my proposal. I can convert it to a
> > design doc if people prefer that way. Just let me know)
> >
> > *Priori work*
> > My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
> > people who contributed that paper, which became the foundation of my
> > discussion.
> >
> > From [1], an EMIT clause was actually proposed to be added to the end of
> > the query. Two syntax of EMIT clause was also proposed:
> > 1. EMIT after the watermark passes the end of the window. E.g. EMIT
> > [STREAM] AFTER WATERMARK.
> > 2. Delay emit for a period of time after a change happens (e.g. element
> > arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES
> >
> > *Typical Streaming emitting latency patterns*
> > 1. Event time triggers. Emitting depends on the relationship between
> > watermark and event timestamp of events. Check this video [2] if you want
> > to have an introduction of watermark in the streaming world, and data
> > completeness concept based on event-timestamp.
> > 2. Processing time triggers. Emitting depends on the system clock. This is
> > a natural idea of emitting. E.g. emit the current result every hour without
> > considering if data in a window is already complete.
> > 3. data-driven triggers. E.g. emit when accumulated events exceed a
> > threshold (e.g. emit when have acculucated 1000 events)
> > 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
> > achieve better latency control.
> >
> > *Proposal to discuss*
> > I want to extend the proposal in [1] and propose EMIT HAVING syntax and two
> > aggregation functions.
> >
> > *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
> > HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar
> > to HAVING. There are two differences that are worth mentioning:
> > 1. It’s not a filter. it controls when table-valued function in FROM clause
> > should emit a set. The set is specified by group-by keys.
> > 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
> > HAVING.
> >
> > *current_watermark(). *current_watermark() is an aggregation function that
> > returns a timestamp that provides where watermark is for a set.
> >
> > *processing_time_since_first_element().* processing_time_since_first_element()
> > is an aggregation function that returns the system clock time (i.e.
> > processing time) since the first element appears in the set.
> >
> > *Motivating examples*
> > The following will be a list of motivating examples of how this proposal
> > will achieve different ways of latency control.
> >
> > Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
> > as the following:
> >
> > “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS
> > T”. Based on this query, let’s say derived table T from orders  has the
> > following schema:
> >
> >
> > order_id BIGINT
> >
> > product_id BIGINT
> >
> > order_meta STRING
> >
> > event_ts TIMESTAMP
> >
> > window_start TIMESTAMP
> >
> > window_end TIMESTAMP
> >
> >
> >
> > The following will be a table to demonstrate for different trigger cases,
> > how the query will be modified by adopting EMIT syntax to express the same
> > semantic:
> >
> > Trigger
> >
> > SQL Query
> >
> > AfterWatermark.pastEndOfWindow()
> > (emit after watermark passes end of window, thus for a window data is
> > believed complete)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > EMIT HAVING current_watermark() >= T.window_end
> >
> > AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
> > standardMinutes(5)
> >
> > (emit after a delay of a 5 minutes when first element appear in window)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
> >
> > AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
> > (emit after every event appears after data is really believed complete in a
> > window)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
> >
> > AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
> > ().plusDuration(Duration.standardMinutes(5))
> >
> > (emit before a window is complete, by following the delay emit strategy)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING current_watermark() < T.window_end AND
> > processing_time_since_first_element() >= INTERVAL 5 MINUTE
> >
> > AfterWatermark.pastEndOfWindow()
> >
> > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
> > (Duration.standardMinutes(5))
> >
> > .withLateFirings(AfterPane.elementCountAtLeast(1))
> >
> > (a combination of emitting before window closes, emit on window closes and
> > emit after window closes)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING
> >
> > current_watermark() >= T.window_end OR
> >
> > (current_watermark() > T.window_end AND COUNT(*) > 1) OR
> >
> > (current_watermark() < T.window_end AND
> > processing_time_since_first_element() >= INTERVAL 5 MINUTE)
> >
> >
> > AfterWatermark.pastEndOfWindow()
> >
> > .withAllowedLateness(Duration.standardDays(2)))
> >
> > (emit after window closes plus a tolerance of 2 days late data)
> >
> > SELECT *
> >
> > FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
> >
> > GROUP BY T.window_end
> >
> > EMIT HAVING
> >
> > current_watermark() >= T.window_end AND current_watermark() < T.window_end
> > + INTERVAL 2 DAY
> >
> > Composite triggers
> > illustrated by examples above that AND and OR can be used to concat
> > different bool expressions.
> >
> >
> >
> > Please let me know your thoughts and any other way you prefer to continue
> > discussing.
> >
> > [1]: https://arxiv.org/pdf/1905.12133.pdf
> > [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
> >
> >
> > Thanks,
> > Rui Wang

Re: [DISCUSS] [CALCITE-3271] EMIT syntax proposal for event-timestamp semantic windowing

Posted by Julian Hyde <jh...@apache.org>.
Does EMIT HAVING have anything to do with aggregate queries (GROUP BY
and HAVING), or is it just a coincidence that you use the same word,
HAVING?

I support the idea of latency controls, but I am nervous about
allowing full expressions in the EMIT clause if we don't have to.

Aggregate queries have a peculiar scope. (You can only reference
grouped expressions, for example.) I don't think we should drag that
peculiar scope into the EMIT clause. The simplest thing is to only
allow the EMIT clause to reference column aliases, which is similar to
the scope used by ORDER BY.

Am I allowed to combine EMIT and ORDER BY in the same SELECT block? If
so, which comes first?

On Tue, Jan 21, 2020 at 11:22 AM Rui Wang <am...@apache.org> wrote:
>
> Hi community,
>
> First of all, thanks all your help on the CALCITE-3272 (TUMBLE now works as
> a table-valued function). As it starts to work, I want to continue
> discussing the following work of streaming sql: a way to control
> materialization latency of a streaming query. A small note for people who
> are not familiar with streaming: because steaming queries are long running
> queries (maybe months or up to year), usually there is a need to see
> results before query is terminated. Thus it will be desired to have a way
> to allow users to specify, e.g. how frequently they want to see some result
> from the query.
>
> (The following will be a summary of my proposal. I can convert it to a
> design doc if people prefer that way. Just let me know)
>
> *Priori work*
> My idea is built on top of "one sql to rule them all paper" [1]. Kudos to
> people who contributed that paper, which became the foundation of my
> discussion.
>
> From [1], an EMIT clause was actually proposed to be added to the end of
> the query. Two syntax of EMIT clause was also proposed:
> 1. EMIT after the watermark passes the end of the window. E.g. EMIT
> [STREAM] AFTER WATERMARK.
> 2. Delay emit for a period of time after a change happens (e.g. element
> arrives in a window). E.g. EMIT [STREAM] AFTER DELAY INTERVAL '6' MINUTES
>
> *Typical Streaming emitting latency patterns*
> 1. Event time triggers. Emitting depends on the relationship between
> watermark and event timestamp of events. Check this video [2] if you want
> to have an introduction of watermark in the streaming world, and data
> completeness concept based on event-timestamp.
> 2. Processing time triggers. Emitting depends on the system clock. This is
> a natural idea of emitting. E.g. emit the current result every hour without
> considering if data in a window is already complete.
> 3. data-driven triggers. E.g. emit when accumulated events exceed a
> threshold (e.g. emit when have acculucated 1000 events)
> 4. Composite triggers. There is a need to concat 1, 2, 3 by OR and AND to
> achieve better latency control.
>
> *Proposal to discuss*
> I want to extend the proposal in [1] and propose EMIT HAVING syntax and two
> aggregation functions.
>
> *EMIT HAVING bool_expression*. EMIT HAVING syntax should be after normal
> HAVING syntax and always works with a GROUP BY. EMIT HAVING behaves similar
> to HAVING. There are two differences that are worth mentioning:
> 1. It’s not a filter. it controls when table-valued function in FROM clause
> should emit a set. The set is specified by group-by keys.
> 2.GROUP BY keys are visible to EMIT HAVING while it is not the case for
> HAVING.
>
> *current_watermark(). *current_watermark() is an aggregation function that
> returns a timestamp that provides where watermark is for a set.
>
> *processing_time_since_first_element().* processing_time_since_first_element()
> is an aggregation function that returns the system clock time (i.e.
> processing time) since the first element appears in the set.
>
> *Motivating examples*
> The following will be a list of motivating examples of how this proposal
> will achieve different ways of latency control.
>
> Assume there is a steaming query that apply fixed windowing (thus TUMBLE)
> as the following:
>
> “SELECT * FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS
> T”. Based on this query, let’s say derived table T from orders  has the
> following schema:
>
>
> order_id BIGINT
>
> product_id BIGINT
>
> order_meta STRING
>
> event_ts TIMESTAMP
>
> window_start TIMESTAMP
>
> window_end TIMESTAMP
>
>
>
> The following will be a table to demonstrate for different trigger cases,
> how the query will be modified by adopting EMIT syntax to express the same
> semantic:
>
> Trigger
>
> SQL Query
>
> AfterWatermark.pastEndOfWindow()
> (emit after watermark passes end of window, thus for a window data is
> believed complete)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> EMIT HAVING current_watermark() >= T.window_end
>
> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.
> standardMinutes(5)
>
> (emit after a delay of a 5 minutes when first element appear in window)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING processing_time_since_first_element() >= INTERVAL 5 MINUTE
>
> AfterWatermark.withLateFirings(AfterPane.elementCountAtLeast(1))
> (emit after every event appears after data is really believed complete in a
> window)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING current_watermark() > T.window_end AND COUNT(*) > 1
>
> AfterWatermark.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane
> ().plusDuration(Duration.standardMinutes(5))
>
> (emit before a window is complete, by following the delay emit strategy)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING current_watermark() < T.window_end AND
> processing_time_since_first_element() >= INTERVAL 5 MINUTE
>
> AfterWatermark.pastEndOfWindow()
>
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDuration
> (Duration.standardMinutes(5))
>
> .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> (a combination of emitting before window closes, emit on window closes and
> emit after window closes)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING
>
> current_watermark() >= T.window_end OR
>
> (current_watermark() > T.window_end AND COUNT(*) > 1) OR
>
> (current_watermark() < T.window_end AND
> processing_time_since_first_element() >= INTERVAL 5 MINUTE)
>
>
> AfterWatermark.pastEndOfWindow()
>
> .withAllowedLateness(Duration.standardDays(2)))
>
> (emit after window closes plus a tolerance of 2 days late data)
>
> SELECT *
>
> FROM TUMBLE(orders, DESCRIPTOR(event_ts), INTERVAL 10 MINUTE) AS T
>
> GROUP BY T.window_end
>
> EMIT HAVING
>
> current_watermark() >= T.window_end AND current_watermark() < T.window_end
> + INTERVAL 2 DAY
>
> Composite triggers
> illustrated by examples above that AND and OR can be used to concat
> different bool expressions.
>
>
>
> Please let me know your thoughts and any other way you prefer to continue
> discussing.
>
> [1]: https://arxiv.org/pdf/1905.12133.pdf
> [2]: https://www.youtube.com/watch?v=TWxSLmkWPm4
>
>
> Thanks,
> Rui Wang