You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Yi Pan <ni...@gmail.com> on 2015/02/09 20:46:33 UTC

Window spec in SQL language vs Samza system details

Hi, Julian and all,

We had a discussion in LinkedIn last week regarding to the window spec in
SQL language on top of Samza systems. There are some issues in the window
spec that I want to discuss:

Consider that we want to have a count of stock trades (as a infinite
stream) happened in the last hour, but only every 11min. It is easy to
write the first part as:
   SELECT STREAM rowtime, count(*) OVER (ORDER BY rowtime RANGE INTERVAL
'1' HOUR PROCEDING)
       FROM Trades
   The above will create a stream of counts that happened every hour
continuously as rows are scanned.

Now here is the question:
   a. how do we have the count every 11min instead of as the row comes in?
As we discussed before, there are examples that we can create by doing
truncating / grouping on the rowtime to "sample" the continuous moving
counting window to get a count every 11min. But that has two issues:
      - From implementation point of view, there is no efficiency
improvement since the system still computes the count for each and every
row comes in
      - If Samza implement a more efficient tumbling window operator, there
is no easy way to identify the section of SQL statement that can map to the
more efficient tumbling window operator, as the sampling is done via math /
group-by aggregation instead of window spec
   b. if there is no row in Trades between 12:00pm to 2:00pm, how do we
tell the system to still generate 0 counts for the time moments: 12:11pm,
12:22pm, 12:33pm, etc.? Or, those rows are delayed in the delivery in the
system and user wants to ignore late-arrival of messages after 5min timeout
to close the counting window? How can we support that use case w/o breaking
SQL grammar?

Both the above issues seem to require some extension to the window spec in
SQL grammar. Julian, what do you think? Is it creating too many
language/parser/planner problems in SQL?

Re: Window spec in SQL language vs Samza system details

Posted by Yi Pan <ni...@gmail.com>.
Hi, Julian,

Thanks for the example. Could you also comment on how can user specifies
the "timeout" to terminate the window in my example? I.e. there is no rows
delivered in 5min, close the current window? Essentially, how do user
specify "punctuation point" in the stream w/o breaking SQL planner? Or it
has to be some hint / system config outside SQL language?

-Yi

On Tue, Feb 10, 2015 at 1:24 PM, Julian Hyde <ju...@hydromatic.net> wrote:

> The answer depends on your design philosophy. We need to strike a balance
> between making it possible and making it easy. Because SQL is a powerful
> closed language, we can achieve a lot by combining the elements. For
> example, I think that your example can be solved by joining a "heartbeat"
> stream to the recent history of the trades stream:
>
>  SELECT STREAM t.*
>   FROM (
>     SELECT ticker, count(*)
>     FROM Trades OVER (RANGE INTERVAL '1' HOUR PRECEDING)
>     GROUP BY ticker) AS t
>  JOIN (STREAM Heartbeat(INTERVAL '11' MINUTE))
>
> Note: "Heartbeat" is a user-defined stream function; OVER in the FROM
> clause to convert a stream into a relation that contains its history over a
> particular period relative to the "current time" of the query.
>
> I seems natural to use a heartbeat because of the "input - output
> cardinality principle":
> * If it's one row in, one row out, use windowed aggregation (agg OVER
> window in the SELECT clause).
> * If it's several rows in, one row out, use streaming GROUP BY (on time
> plus possibly other keys)
> * If it's possible that the system produces output when there are no rows
> in, you need to use a heartbeat stream
>
> Now, if you take the "make it possible" philosophy to its limit, there is
> a danger that you end up with SQL that is so complicated that end users
> don't understand it, and/or is so complicated that the planner cannot
> recognize the pattern and convert it to your nice efficient physical
> operator.
>
> That is the point where you very carefully introduce SQL extensions.
>
> But I strongly suggest creating a "cookbook". Give each stream-processing
> pattern a name and illustrate using a simple example. Show the SQL that can
> achieve that pattern. It doesn't matter if the SQL is a bit gnarly. Users
> will grok it, and adapt the SQL for their applications. And they will
> compose it using union, join, group by... to create new patterns you hadn't
> thought of.
>
> If you extend SQL without sufficient thought, you might break
> composability, and that is a huge problem. Or you end up writing the
> planner so that it produces the right plan when it sees the query in its
> sugared version but not when expressed using the fundamentals (case in
> point: if we had introduced a "tumbling window" syntax, someone could still
> express it using the JOIN query above); a lesser problem, but still
> undermines your users' trust in the system to do the right thing.
>
> Julian
>
>
> > On Feb 9, 2015, at 11:46 AM, Yi Pan <ni...@gmail.com> wrote:
> >
> > Hi, Julian and all,
> >
> > We had a discussion in LinkedIn last week regarding to the window spec in
> > SQL language on top of Samza systems. There are some issues in the window
> > spec that I want to discuss:
> >
> > Consider that we want to have a count of stock trades (as a infinite
> > stream) happened in the last hour, but only every 11min. It is easy to
> > write the first part as:
> >   SELECT STREAM rowtime, count(*) OVER (ORDER BY rowtime RANGE INTERVAL
> > '1' HOUR PROCEDING)
> >       FROM Trades
> >   The above will create a stream of counts that happened every hour
> > continuously as rows are scanned.
> >
> > Now here is the question:
> >   a. how do we have the count every 11min instead of as the row comes in?
> > As we discussed before, there are examples that we can create by doing
> > truncating / grouping on the rowtime to "sample" the continuous moving
> > counting window to get a count every 11min. But that has two issues:
> >      - From implementation point of view, there is no efficiency
> > improvement since the system still computes the count for each and every
> > row comes in
> >      - If Samza implement a more efficient tumbling window operator,
> there
> > is no easy way to identify the section of SQL statement that can map to
> the
> > more efficient tumbling window operator, as the sampling is done via
> math /
> > group-by aggregation instead of window spec
> >   b. if there is no row in Trades between 12:00pm to 2:00pm, how do we
> > tell the system to still generate 0 counts for the time moments: 12:11pm,
> > 12:22pm, 12:33pm, etc.? Or, those rows are delayed in the delivery in the
> > system and user wants to ignore late-arrival of messages after 5min
> timeout
> > to close the counting window? How can we support that use case w/o
> breaking
> > SQL grammar?
> >
> > Both the above issues seem to require some extension to the window spec
> in
> > SQL grammar. Julian, what do you think? Is it creating too many
> > language/parser/planner problems in SQL?
>
>

Re: Window spec in SQL language vs Samza system details

Posted by Julian Hyde <ju...@hydromatic.net>.
The answer depends on your design philosophy. We need to strike a balance between making it possible and making it easy. Because SQL is a powerful closed language, we can achieve a lot by combining the elements. For example, I think that your example can be solved by joining a "heartbeat" stream to the recent history of the trades stream:

 SELECT STREAM t.*
  FROM (
    SELECT ticker, count(*) 
    FROM Trades OVER (RANGE INTERVAL '1' HOUR PRECEDING)
    GROUP BY ticker) AS t
 JOIN (STREAM Heartbeat(INTERVAL '11' MINUTE))

Note: "Heartbeat" is a user-defined stream function; OVER in the FROM clause to convert a stream into a relation that contains its history over a particular period relative to the "current time" of the query.
 
I seems natural to use a heartbeat because of the "input - output cardinality principle":
* If it's one row in, one row out, use windowed aggregation (agg OVER window in the SELECT clause). 
* If it's several rows in, one row out, use streaming GROUP BY (on time plus possibly other keys)
* If it's possible that the system produces output when there are no rows in, you need to use a heartbeat stream

Now, if you take the "make it possible" philosophy to its limit, there is a danger that you end up with SQL that is so complicated that end users don't understand it, and/or is so complicated that the planner cannot recognize the pattern and convert it to your nice efficient physical operator.

That is the point where you very carefully introduce SQL extensions.

But I strongly suggest creating a "cookbook". Give each stream-processing pattern a name and illustrate using a simple example. Show the SQL that can achieve that pattern. It doesn't matter if the SQL is a bit gnarly. Users will grok it, and adapt the SQL for their applications. And they will compose it using union, join, group by... to create new patterns you hadn't thought of.

If you extend SQL without sufficient thought, you might break composability, and that is a huge problem. Or you end up writing the planner so that it produces the right plan when it sees the query in its sugared version but not when expressed using the fundamentals (case in point: if we had introduced a "tumbling window" syntax, someone could still express it using the JOIN query above); a lesser problem, but still undermines your users' trust in the system to do the right thing.

Julian


> On Feb 9, 2015, at 11:46 AM, Yi Pan <ni...@gmail.com> wrote:
> 
> Hi, Julian and all,
> 
> We had a discussion in LinkedIn last week regarding to the window spec in
> SQL language on top of Samza systems. There are some issues in the window
> spec that I want to discuss:
> 
> Consider that we want to have a count of stock trades (as a infinite
> stream) happened in the last hour, but only every 11min. It is easy to
> write the first part as:
>   SELECT STREAM rowtime, count(*) OVER (ORDER BY rowtime RANGE INTERVAL
> '1' HOUR PROCEDING)
>       FROM Trades
>   The above will create a stream of counts that happened every hour
> continuously as rows are scanned.
> 
> Now here is the question:
>   a. how do we have the count every 11min instead of as the row comes in?
> As we discussed before, there are examples that we can create by doing
> truncating / grouping on the rowtime to "sample" the continuous moving
> counting window to get a count every 11min. But that has two issues:
>      - From implementation point of view, there is no efficiency
> improvement since the system still computes the count for each and every
> row comes in
>      - If Samza implement a more efficient tumbling window operator, there
> is no easy way to identify the section of SQL statement that can map to the
> more efficient tumbling window operator, as the sampling is done via math /
> group-by aggregation instead of window spec
>   b. if there is no row in Trades between 12:00pm to 2:00pm, how do we
> tell the system to still generate 0 counts for the time moments: 12:11pm,
> 12:22pm, 12:33pm, etc.? Or, those rows are delayed in the delivery in the
> system and user wants to ignore late-arrival of messages after 5min timeout
> to close the counting window? How can we support that use case w/o breaking
> SQL grammar?
> 
> Both the above issues seem to require some extension to the window spec in
> SQL grammar. Julian, what do you think? Is it creating too many
> language/parser/planner problems in SQL?