You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Milinda Pathirage <mp...@umail.iu.edu> on 2015/02/26 21:18:02 UTC

Handling defaults and windowed aggregates in stream queries

Hi devs,

I ask about $subject in calcite-dev. You can find the archived discussion
at [1]. I think your thoughts are also valuable in this discussion in
calcite list.

I discovered the requirement for a default window operator when I tried to
integrate streamscan (I was using tablescan prevously) into the physical
plan generation logic. Because of the way we have written the
OperatorRouter API, we always need a stream-to-relation operator at the
input. But Calcite generates a query plan like following:

LogicalDelta
  LogicalProject(id=[$0], product=[$1], quantity=[$2])
    LogicalFilter(condition=[>($2, 5)])

      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])

If we consider LogicalFilter as a relation operator, we need something to
convert input stream to a relation before sending the tuples downstream.
In addition to this, there is a optimization where we consider filter
operator as a tuple operator and have it between StreamScan and
stream-to-relation operator as a way of reducing the amount of messages
going downstream.

Other scenario is windowed aggregates. Currently window spec is attached to
the LogicalProject in query plan like following:

LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING AND
2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
FOLLOWING)):INTEGER, null)])

I wanted to know from them whether it is possible to move window operation
just after the stream scan, so that it is compatible with our operator
layer.
May be there are better or easier ways to do this. So your comments are
always welcome.

Thanks
Milinda


[1]
http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser

-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Handling defaults and windowed aggregates in stream queries

Posted by Chris Riccomini <cr...@apache.org>.
Hey Milinda,

Are you referring to this thread?

http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/%3CCACwebjTshFNi=eS4QZ1ZKkQMUYGZN+LWj_bAPqPdrVSY2tQW1A@mail.gmail.com%3E

It appears as though your question remains unanswered. :(

> If we consider LogicalFilter as a relation operator, we need something to convert
input stream to a relation before sending the tuples downstream.

Does CQL model this as a window operator where every window is exactly 1
row (i.e. a sliding window of length 1)?

Cheers,
Chris

On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi devs,
>
> I ask about $subject in calcite-dev. You can find the archived discussion
> at [1]. I think your thoughts are also valuable in this discussion in
> calcite list.
>
> I discovered the requirement for a default window operator when I tried to
> integrate streamscan (I was using tablescan prevously) into the physical
> plan generation logic. Because of the way we have written the
> OperatorRouter API, we always need a stream-to-relation operator at the
> input. But Calcite generates a query plan like following:
>
> LogicalDelta
>   LogicalProject(id=[$0], product=[$1], quantity=[$2])
>     LogicalFilter(condition=[>($2, 5)])
>
>       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>
> If we consider LogicalFilter as a relation operator, we need something to
> convert input stream to a relation before sending the tuples downstream.
> In addition to this, there is a optimization where we consider filter
> operator as a tuple operator and have it between StreamScan and
> stream-to-relation operator as a way of reducing the amount of messages
> going downstream.
>
> Other scenario is windowed aggregates. Currently window spec is attached to
> the LogicalProject in query plan like following:
>
> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING AND
> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> FOLLOWING)):INTEGER, null)])
>
> I wanted to know from them whether it is possible to move window operation
> just after the stream scan, so that it is compatible with our operator
> layer.
> May be there are better or easier ways to do this. So your comments are
> always welcome.
>
> Thanks
> Milinda
>
>
> [1]
>
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Handling defaults and windowed aggregates in stream queries

Posted by Julian Hyde <ju...@hydromatic.net>.
I think that is something to be handled at the stream level, not at the query language. The stream basically needs to declare “all data timestamped before 11:00 has already arrived”. How it does that is a matter of policy. Reasonable policies could be:

1. The wall-clock has reached 11:15

2. A heart-beat labeled 11:00 has reached the stream (regardless of the wall-clock time)

3. The wall-clock has reached 11:01 and all “premium” producers (i.e. those authorized to hold the stream back) are connected and are responding to pings

There are many reasonable policies. All the query language needs to know is whether the stream has decided it is safe to move beyond 11:00.

Julian


On Mar 4, 2015, at 10:28 AM, Milinda Pathirage <mp...@umail.iu.edu> wrote:

> Hi Julian,
> 
> I went through the draft and it covers most of our requirements. But
> aggregation over a window will not be as simple as mentioned in the draft.
> 
> In the stream extension draft we have following:
> 
> 'How did Calcite know that the 10:00:00 sub-totals were complete at
>> 11:00:00, so that it could emit them? It knows that rowtime is increasing,
>> and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, once it
>> has seen a row at or after 11:00:00, it will never see a row that will
>> contribute to a 10:00:00 total.'
> 
> 
> When there are delays, we can't do above. Because observing a row with
> rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to 10:00:59
> time window will not arrive after this observation. We have discussed this
> in https://issues.apache.org/jira/browse/SAMZA-552. Even if we consider the
> 'system time/stream time' as mentioned in SAMZA-552, it doesn't guarantee
> the absence of delays in a distributed setting. So we may need to
> additional hints/extensions to specify extra information required to handle
> complexities in window calculations.
> 
> May be there are ways to handle this at Samza level, not in the query
> language.
> 
> @Chirs, @Yi
> I got the query planner working with some dummy operators and re-writing
> the query to add default window operators. But Julian's comments about
> handling defaults and optimizing the query plan (moving the Delta down and
> removing both Delta and Chi) got me into thinking whether enforcing CQL
> semantics as we have in our current operator layer limits the flexibility
> and increase the complexity of query plan to operator router generation.
> Anyway, I am going to take a step back and think more about Julian's
> comments. I'll put my thoughts into a design document for query planner.
> 
> Thanks
> Milinda
> 
> 
> On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net> wrote:
> 
>> Sorry to show up late to this party. I've had my head down writing a
>> description of streaming SQL which I hoped would answer questions like
>> this. Here is the latest draft:
>> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
>> 
>> I've been avoiding windows for now. They are not needed for simple queries
>> (project, filter, windowed aggregate) and I wanted to write the
>> specification of more complex queries before I introduce them.
>> 
>> Let's look at a simple query, filter. According to CQL, to evaluate
>> 
>>  select stream *
>>  from orders
>>  where productId = 10    (query 1)
>> 
>> you need to convert orders to a relation over a particular window, apply
>> the filter, then convert back to a stream. We could write
>> 
>>  select stream *
>>  from orders over (order by rowtime range between unbounded preceding and
>> current row)
>>  where productId = 10    (query 2)
>> 
>> or we could write
>> 
>>  select stream *
>>  from orders over (order by rowtime range between current row and current
>> row)
>>  where productId = 10      (query 3)
>> 
>> Very different windows, but they produce the same result, because of the
>> stateless nature of Filter. So, let's suppose that the default window is
>> the one I gave first, "(order by rowtime range between unbounded preceding
>> and current row)", and so query 1 is just short-hand for query 2.
>> 
>> I currently translate query 1 to
>> 
>> Delta
>>  Filter($1 = 10)
>>    Scan(orders)
>> 
>> but I should really be translating to
>> 
>> Delta
>>  Filter($1 = 10)
>>    Chi(order by $0 range between unbounded preceding and current row)
>>      Scan(orders)
>> 
>> Delta is the "differentiation" operator and Chi is the "integration"
>> operator. After we apply rules to push the Delta through the Filter, the
>> Delta and Chi will collide and cancel each other out.
>> 
>> Why have I not yet introduced the Chi operator? Because I have not yet
>> dealt with a query where it makes any difference.
>> 
>> Where it will make a difference is joins. But even for joins, I hold out
>> hope that we can avoid explicit windows, most of the time. One could write
>> 
>>  select stream *
>>  from orders over (order by rowtime range between current row and
>> interval '1' hour following)
>>  join shipments
>>  on orders.orderId = shipments.orderId    (query 4)
>> 
>> but I think most people would find the following clearer:
>> 
>>  select stream *
>>  from orders
>>  join shipments
>>  on orders.orderId = shipments.orderId          (query 5)
>>  and shipments.rowtime between orders.rowtime and orders.rowtime +
>> interval '1' hour
>> 
>> Under the covers there are still the implicit windows:
>> 
>>  select stream *
>>  from orders over (order by rowtime range between unbounded preceding and
>> current row)
>>  join shipments over (order by rowtime range between unbounded preceding
>> and current row)
>>  on orders.orderId = shipments.orderId          (query 6)
>>  and shipments.rowtime between orders.rowtime and orders.rowtime +
>> interval '1' hour
>> 
>> Query 6 is equivalent to query 5. But the system can notice the join
>> condition involving the two streams' rowtimes and trim down the windows
>> (one window to an hour, another window to just the current row) without
>> changing semantics:
>> 
>>  select stream *
>>  from orders over (order by rowtime range between interval '1' hour
>> preceding and current row)
>>  join shipments over (order by rowtime range between current row and
>> current row)
>>  on orders.orderId = shipments.orderId          (query 7)
>>  and shipments.rowtime between orders.rowtime and orders.rowtime +
>> interval '1' hour
>> 
>> So, my hope is that end-users will rarely need to use an explicit window.
>> 
>> In the algebra, we will start introducing Chi. It will evaporate for
>> simple queries such as Filter. It will remain for more complex queries such
>> as stream-to-stream join, because you are joining the current row of one
>> stream to a time-varying relation based on the other, and Chi represents
>> that "recent history of a stream" relation.
>> 
>> Julian
>> 
>> 
>>> On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <mp...@umail.iu.edu>
>> wrote:
>>> 
>>> Hi Yi,
>>> 
>>> As I understand rules and re-writes basically do the same thing
>>> (changing/re-writing the operator tree). But in case of rules this
>> happens
>>> during planning based on the query planner configuration. And re-writing
>> is
>>> done on the planner output, after the query goes through the planner. In
>>> Calcite re-write is happening inside the interpreter and in our case it
>>> will be inside the query plan to operator router conversion phase.
>>> 
>>> Thanks
>>> Milinda
>>> 
>>> On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:
>>> 
>>>> Hi, Milinda,
>>>> 
>>>> +1 on your default window idea. One question: what's the difference
>> between
>>>> a rule and a re-write?
>>>> 
>>>> Thanks!
>>>> 
>>>> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
>> mpathira@umail.iu.edu>
>>>> wrote:
>>>> 
>>>>> @Chris
>>>>> Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
>>>>> window, it should be a ‘Unbounded’ window for most the default
>> scenarios
>>>>> (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf).
>>>>> Because
>>>>> applying a ‘Now’ window with size of 1 will double the number of events
>>>>> generated if we consider insert/delete streams. But ‘Unbounded’ will
>> only
>>>>> generate insert events.
>>>>> 
>>>>> @Yi
>>>>> 1. You are correct about Calcite.There is no stream-to-relation
>>>> conversion
>>>>> happening. But as I understand we don’t need Calcite to support this.
>> We
>>>>> can add it to our query planner as a rule or re-write. What I am not
>> sure
>>>>> is whether to use a rule or a re-write.
>>>>> 2. There is a rule in Calcite which extract the Window out from the
>>>>> Project. But I am not sure why that didn’t happen in my test. This rule
>>>> is
>>>>> added to the planner by default. I’ll ask about this in Calcite mailing
>>>>> list.
>>>>> 
>>>>> I think we can figure out a way to move the window to the input stream
>> if
>>>>> Calcite can move the window out from Project. I’ll see how we can do
>>>> this.
>>>>> 
>>>>> Also I’ll go ahead and implement default windows. We can change it
>> later
>>>> if
>>>>> Julian or someone from Calcite comes up with a better suggestion.
>>>>> 
>>>>> Thanks
>>>>> Milinda
>>>>> 
>>>>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:
>>>>> 
>>>>>> Hi, Milinda,
>>>>>> 
>>>>>> Sorry to reply late on this. Here are some of my comments:
>>>>>> 1) In Calcite's model, it seems that there is no stream-to-relation
>>>>>> conversion step. In the first example where the window specification
>> is
>>>>>> missing, I like your solution to add the default LogicalNowWindow
>>>>> operator
>>>>>> s.t. it makes the physical operator matches the query plan. However,
>> if
>>>>>> Calcite community does not agree to add the default LogicalNowWindow,
>>>> it
>>>>>> would be fine for us if we always insert a default "now" window on a
>>>>> stream
>>>>>> when we generate the Samza configuration.
>>>>>> 2) I am more concerned on the other cases, where window operator is
>>>> used
>>>>> in
>>>>>> aggregation and join. In your example of windowed aggregation in
>>>> Calcite,
>>>>>> window spec seems to be a decoration to the LogicalProject operator,
>>>>>> instead of defining a data source to the LogicalProject operator. In
>>>> the
>>>>>> CQL model we followed, the window operator is considered as a query
>>>>>> primitive that generate a data source for other relation operators to
>>>>>> consume. How exactly is window operator used in Calcite planner? Isn't
>>>> it
>>>>>> much clear if the following is used?
>>>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
>>>>>> null)])
>>>>>>  LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
>>>>>> 
>>>>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
>>>>> mpathira@umail.iu.edu
>>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi devs,
>>>>>>> 
>>>>>>> I ask about $subject in calcite-dev. You can find the archived
>>>>> discussion
>>>>>>> at [1]. I think your thoughts are also valuable in this discussion in
>>>>>>> calcite list.
>>>>>>> 
>>>>>>> I discovered the requirement for a default window operator when I
>>>> tried
>>>>>> to
>>>>>>> integrate streamscan (I was using tablescan prevously) into the
>>>>> physical
>>>>>>> plan generation logic. Because of the way we have written the
>>>>>>> OperatorRouter API, we always need a stream-to-relation operator at
>>>> the
>>>>>>> input. But Calcite generates a query plan like following:
>>>>>>> 
>>>>>>> LogicalDelta
>>>>>>> LogicalProject(id=[$0], product=[$1], quantity=[$2])
>>>>>>>   LogicalFilter(condition=[>($2, 5)])
>>>>>>> 
>>>>>>>     StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>>>>>>> 
>>>>>>> If we consider LogicalFilter as a relation operator, we need
>>>> something
>>>>> to
>>>>>>> convert input stream to a relation before sending the tuples
>>>>> downstream.
>>>>>>> In addition to this, there is a optimization where we consider filter
>>>>>>> operator as a tuple operator and have it between StreamScan and
>>>>>>> stream-to-relation operator as a way of reducing the amount of
>>>> messages
>>>>>>> going downstream.
>>>>>>> 
>>>>>>> Other scenario is windowed aggregates. Currently window spec is
>>>>> attached
>>>>>> to
>>>>>>> the LogicalProject in query plan like following:
>>>>>>> 
>>>>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
>>>> PRECEDING
>>>>>> AND
>>>>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
>>>>>>> FOLLOWING)):INTEGER, null)])
>>>>>>> 
>>>>>>> I wanted to know from them whether it is possible to move window
>>>>>> operation
>>>>>>> just after the stream scan, so that it is compatible with our
>>>> operator
>>>>>>> layer.
>>>>>>> May be there are better or easier ways to do this. So your comments
>>>> are
>>>>>>> always welcome.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Milinda
>>>>>>> 
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>>>>>>> 
>>>>>>> --
>>>>>>> Milinda Pathirage
>>>>>>> 
>>>>>>> PhD Student | Research Assistant
>>>>>>> School of Informatics and Computing | Data to Insight Center
>>>>>>> Indiana University
>>>>>>> 
>>>>>>> twitter: milindalakmal
>>>>>>> skype: milinda.pathirage
>>>>>>> blog: http://milinda.pathirage.org
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Milinda Pathirage
>>>>> 
>>>>> PhD Student | Research Assistant
>>>>> School of Informatics and Computing | Data to Insight Center
>>>>> Indiana University
>>>>> 
>>>>> twitter: milindalakmal
>>>>> skype: milinda.pathirage
>>>>> blog: http://milinda.pathirage.org
>>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> Milinda Pathirage
>>> 
>>> PhD Student | Research Assistant
>>> School of Informatics and Computing | Data to Insight Center
>>> Indiana University
>>> 
>>> twitter: milindalakmal
>>> skype: milinda.pathirage
>>> blog: http://milinda.pathirage.org
>> 
>> 
> 
> 
> -- 
> Milinda Pathirage
> 
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
> 
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org


Re: Handling defaults and windowed aggregates in stream queries

Posted by Yi Pan <ni...@gmail.com>.
Hi, Milinda,

Let me put my answers below:

On Thu, Mar 5, 2015 at 11:42 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:
>
> > a) Each window will have a closing policy: it would either be wall-clock
> > based timeout, or the arrival of messages indicating that we have
> received
> > all messages in the corresponding event time window
> >
>
> Given that the closing policy is not explicit in the query, how we are
> going to handle this. Is this policy going to be specific to a query or
> system wide thing. I think I was not clear about this in the previous mail.
>
>
Jay had commented on this "closing" term and I realized that what we truly
want here is not "closing a window", but "updating the window output".
Hence, the above description would be:
Each window will have a window update policy that controls when to send an
output for this window. The output can be configured by default to:
1) update window output every 10 second until the new window is opened; Or
2) update window output when we have received a message that has a future
event time
3) for late arrivals, update past window output for each late arrival

I would think of the above policies be system-wide default. The only
configurations from the query language that a window operator needs are the
following:
1) Window size measurement: tuple or event-time?
2) Window length: the length of the window measured in tuple or event-time


>
> > b) Each window also keeps all the past messages it receives in the past
> > windows, up to a large retention size that covers all possible late
> > arrivals
> >
>
> Are we going to keep this in local storage. Is this (keeping past messages)
> really necessary in case of monotonic queries. May be you meant to say we
> just keep metadata about offsets. So we can replay from Kafka (I don't have
> that much experience with Kafka, but I think we can start consuming from
> random offsets).
>
>
Yes, we can replay from Kafka. But I would argue that it would be more
efficient to access locally as we have a store to buffer it. Also, think
about a event-time window of 10min length from the following stream:
(offset=1, eventtime=9:01:00,...)
(offset=2, eventtime=9:00:00,...)
(offset=3, eventtime=10:00:00,...)
....
(offset=3000, eventtime=9:03:00,...)

In a store, you can store the entries keyed by eventtime of the messages as:
(key=9:00:00, (offset=2, eventtime=9:00:00,...))
(key=9:01:00, (offset=1, eventtime=9:01:00,...))
(key=9:03:00, (offset=3000, eventtime=9:03:00,...))

And accessing the past window messages are simple range scan from 9:00 to
9:10, while replaying from Kafka will be from offset=1 to offset=3000.




>
> > c) When a window's closing policy is satisfied, the window operator
> always
> > emits the current window results
> >
>
> Does this means we are waiting for the window to be closed, before sending
> new messages downstream? This may have performance implications, but this
> will make it easy to implement the query processing. I think current
> operator layer can support this style without any changes.
>
>
First, let's correct the term to update the window output, not closing the
window. Then, the default window update policy can be something combined w/
system timer and the window size measurement, as stated above. My original
thought was just set the default window update policy to wait till we
believe no more messages are coming for the window, either due to no new
messages for a certain time, or the window size is considered to reach the
configured length. But it can be tuned in a full spectrum from update
window output per incoming tuple or 1 second to, only send a single window
output when we slide the window to next. The current operator layer should
be able to support all different styles, w/ different update policies
implemented in the process() method.


>
> > d) When a late arrival message came, the window operator will re-emit the
> > past window results to correct the previous window results
> >
> >
> It would be better if we can do incremental updates without replaying the
> whole window. But I believe there are advantages of this approach.
>
>
Yes. Definitely, I think for most cases, we can just send delta updates
downstream + allow accessing the past window state if needed. For most
aggregations, it may even be simpler that we just need to keep a single
value per past window in the store and directly send out one entry.
However, for join of two streams, when we receive one update from one
stream, we may need to access all messages in the past window from another
stream to perform the join.


>
>
> > On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage <
> mpathira@umail.iu.edu>
> > wrote:
> >
> > > Hi Julian,
> > >
> > > I went through the draft and it covers most of our requirements. But
> > > aggregation over a window will not be as simple as mentioned in the
> > draft.
> > >
> > > In the stream extension draft we have following:
> > >
> > > 'How did Calcite know that the 10:00:00 sub-totals were complete at
> > > > 11:00:00, so that it could emit them? It knows that rowtime is
> > > increasing,
> > > > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, once
> > it
> > > > has seen a row at or after 11:00:00, it will never see a row that
> will
> > > > contribute to a 10:00:00 total.'
> > >
> > >
> > > When there are delays, we can't do above. Because observing a row with
> > > rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to
> > 10:00:59
> > > time window will not arrive after this observation. We have discussed
> > this
> > > in https://issues.apache.org/jira/browse/SAMZA-552. Even if we
> consider
> > > the
> > > 'system time/stream time' as mentioned in SAMZA-552, it doesn't
> guarantee
> > > the absence of delays in a distributed setting. So we may need to
> > > additional hints/extensions to specify extra information required to
> > handle
> > > complexities in window calculations.
> > >
> > > May be there are ways to handle this at Samza level, not in the query
> > > language.
> > >
> > > @Chirs, @Yi
> > > I got the query planner working with some dummy operators and
> re-writing
> > > the query to add default window operators. But Julian's comments about
> > > handling defaults and optimizing the query plan (moving the Delta down
> > and
> > > removing both Delta and Chi) got me into thinking whether enforcing CQL
> > > semantics as we have in our current operator layer limits the
> flexibility
> > > and increase the complexity of query plan to operator router
> generation.
> > > Anyway, I am going to take a step back and think more about Julian's
> > > comments. I'll put my thoughts into a design document for query
> planner.
> > >
> > > Thanks
> > > Milinda
> > >
> > >
> > > On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net>
> > wrote:
> > >
> > > > Sorry to show up late to this party. I've had my head down writing a
> > > > description of streaming SQL which I hoped would answer questions
> like
> > > > this. Here is the latest draft:
> > > >
> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
> > > >
> > > > I've been avoiding windows for now. They are not needed for simple
> > > queries
> > > > (project, filter, windowed aggregate) and I wanted to write the
> > > > specification of more complex queries before I introduce them.
> > > >
> > > > Let's look at a simple query, filter. According to CQL, to evaluate
> > > >
> > > >   select stream *
> > > >   from orders
> > > >   where productId = 10    (query 1)
> > > >
> > > > you need to convert orders to a relation over a particular window,
> > apply
> > > > the filter, then convert back to a stream. We could write
> > > >
> > > >   select stream *
> > > >   from orders over (order by rowtime range between unbounded
> preceding
> > > and
> > > > current row)
> > > >   where productId = 10    (query 2)
> > > >
> > > > or we could write
> > > >
> > > >   select stream *
> > > >   from orders over (order by rowtime range between current row and
> > > current
> > > > row)
> > > >   where productId = 10      (query 3)
> > > >
> > > > Very different windows, but they produce the same result, because of
> > the
> > > > stateless nature of Filter. So, let's suppose that the default window
> > is
> > > > the one I gave first, "(order by rowtime range between unbounded
> > > preceding
> > > > and current row)", and so query 1 is just short-hand for query 2.
> > > >
> > > > I currently translate query 1 to
> > > >
> > > > Delta
> > > >   Filter($1 = 10)
> > > >     Scan(orders)
> > > >
> > > > but I should really be translating to
> > > >
> > > > Delta
> > > >   Filter($1 = 10)
> > > >     Chi(order by $0 range between unbounded preceding and current
> row)
> > > >       Scan(orders)
> > > >
> > > > Delta is the "differentiation" operator and Chi is the "integration"
> > > > operator. After we apply rules to push the Delta through the Filter,
> > the
> > > > Delta and Chi will collide and cancel each other out.
> > > >
> > > > Why have I not yet introduced the Chi operator? Because I have not
> yet
> > > > dealt with a query where it makes any difference.
> > > >
> > > > Where it will make a difference is joins. But even for joins, I hold
> > out
> > > > hope that we can avoid explicit windows, most of the time. One could
> > > write
> > > >
> > > >   select stream *
> > > >   from orders over (order by rowtime range between current row and
> > > > interval '1' hour following)
> > > >   join shipments
> > > >   on orders.orderId = shipments.orderId    (query 4)
> > > >
> > > > but I think most people would find the following clearer:
> > > >
> > > >   select stream *
> > > >   from orders
> > > >   join shipments
> > > >   on orders.orderId = shipments.orderId          (query 5)
> > > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > > > interval '1' hour
> > > >
> > > > Under the covers there are still the implicit windows:
> > > >
> > > >   select stream *
> > > >   from orders over (order by rowtime range between unbounded
> preceding
> > > and
> > > > current row)
> > > >   join shipments over (order by rowtime range between unbounded
> > preceding
> > > > and current row)
> > > >   on orders.orderId = shipments.orderId          (query 6)
> > > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > > > interval '1' hour
> > > >
> > > > Query 6 is equivalent to query 5. But the system can notice the join
> > > > condition involving the two streams' rowtimes and trim down the
> windows
> > > > (one window to an hour, another window to just the current row)
> without
> > > > changing semantics:
> > > >
> > > >   select stream *
> > > >   from orders over (order by rowtime range between interval '1' hour
> > > > preceding and current row)
> > > >   join shipments over (order by rowtime range between current row and
> > > > current row)
> > > >   on orders.orderId = shipments.orderId          (query 7)
> > > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > > > interval '1' hour
> > > >
> > > > So, my hope is that end-users will rarely need to use an explicit
> > window.
> > > >
> > > > In the algebra, we will start introducing Chi. It will evaporate for
> > > > simple queries such as Filter. It will remain for more complex
> queries
> > > such
> > > > as stream-to-stream join, because you are joining the current row of
> > one
> > > > stream to a time-varying relation based on the other, and Chi
> > represents
> > > > that "recent history of a stream" relation.
> > > >
> > > > Julian
> > > >
> > > >
> > > > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <
> > mpathira@umail.iu.edu>
> > > > wrote:
> > > > >
> > > > > Hi Yi,
> > > > >
> > > > > As I understand rules and re-writes basically do the same thing
> > > > > (changing/re-writing the operator tree). But in case of rules this
> > > > happens
> > > > > during planning based on the query planner configuration. And
> > > re-writing
> > > > is
> > > > > done on the planner output, after the query goes through the
> planner.
> > > In
> > > > > Calcite re-write is happening inside the interpreter and in our
> case
> > it
> > > > > will be inside the query plan to operator router conversion phase.
> > > > >
> > > > > Thanks
> > > > > Milinda
> > > > >
> > > > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com>
> wrote:
> > > > >
> > > > >> Hi, Milinda,
> > > > >>
> > > > >> +1 on your default window idea. One question: what's the
> difference
> > > > between
> > > > >> a rule and a re-write?
> > > > >>
> > > > >> Thanks!
> > > > >>
> > > > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
> > > > mpathira@umail.iu.edu>
> > > > >> wrote:
> > > > >>
> > > > >>> @Chris
> > > > >>> Yes, I was referring to that mail. Actually I was wrong about the
> > > ‘Now’
> > > > >>> window, it should be a ‘Unbounded’ window for most the default
> > > > scenarios
> > > > >>> (Section 6.4 of
> > https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
> > > ).
> > > > >>> Because
> > > > >>> applying a ‘Now’ window with size of 1 will double the number of
> > > events
> > > > >>> generated if we consider insert/delete streams. But ‘Unbounded’
> > will
> > > > only
> > > > >>> generate insert events.
> > > > >>>
> > > > >>> @Yi
> > > > >>> 1. You are correct about Calcite.There is no stream-to-relation
> > > > >> conversion
> > > > >>> happening. But as I understand we don’t need Calcite to support
> > this.
> > > > We
> > > > >>> can add it to our query planner as a rule or re-write. What I am
> > not
> > > > sure
> > > > >>> is whether to use a rule or a re-write.
> > > > >>> 2. There is a rule in Calcite which extract the Window out from
> the
> > > > >>> Project. But I am not sure why that didn’t happen in my test.
> This
> > > rule
> > > > >> is
> > > > >>> added to the planner by default. I’ll ask about this in Calcite
> > > mailing
> > > > >>> list.
> > > > >>>
> > > > >>> I think we can figure out a way to move the window to the input
> > > stream
> > > > if
> > > > >>> Calcite can move the window out from Project. I’ll see how we can
> > do
> > > > >> this.
> > > > >>>
> > > > >>> Also I’ll go ahead and implement default windows. We can change
> it
> > > > later
> > > > >> if
> > > > >>> Julian or someone from Calcite comes up with a better suggestion.
> > > > >>>
> > > > >>> Thanks
> > > > >>> Milinda
> > > > >>>
> > > > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com>
> > wrote:
> > > > >>>
> > > > >>>> Hi, Milinda,
> > > > >>>>
> > > > >>>> Sorry to reply late on this. Here are some of my comments:
> > > > >>>> 1) In Calcite's model, it seems that there is no
> > stream-to-relation
> > > > >>>> conversion step. In the first example where the window
> > specification
> > > > is
> > > > >>>> missing, I like your solution to add the default
> LogicalNowWindow
> > > > >>> operator
> > > > >>>> s.t. it makes the physical operator matches the query plan.
> > However,
> > > > if
> > > > >>>> Calcite community does not agree to add the default
> > > LogicalNowWindow,
> > > > >> it
> > > > >>>> would be fine for us if we always insert a default "now" window
> > on a
> > > > >>> stream
> > > > >>>> when we generate the Samza configuration.
> > > > >>>> 2) I am more concerned on the other cases, where window operator
> > is
> > > > >> used
> > > > >>> in
> > > > >>>> aggregation and join. In your example of windowed aggregation in
> > > > >> Calcite,
> > > > >>>> window spec seems to be a decoration to the LogicalProject
> > operator,
> > > > >>>> instead of defining a data source to the LogicalProject
> operator.
> > In
> > > > >> the
> > > > >>>> CQL model we followed, the window operator is considered as a
> > query
> > > > >>>> primitive that generate a data source for other relation
> operators
> > > to
> > > > >>>> consume. How exactly is window operator used in Calcite planner?
> > > Isn't
> > > > >> it
> > > > >>>> much clear if the following is used?
> > > > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0),
> > > CAST($SUM0($2)):INTEGER,
> > > > >>>> null)])
> > > > >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> > > > >>>>
> > > > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> > > > >>> mpathira@umail.iu.edu
> > > > >>>>>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Hi devs,
> > > > >>>>>
> > > > >>>>> I ask about $subject in calcite-dev. You can find the archived
> > > > >>> discussion
> > > > >>>>> at [1]. I think your thoughts are also valuable in this
> > discussion
> > > in
> > > > >>>>> calcite list.
> > > > >>>>>
> > > > >>>>> I discovered the requirement for a default window operator
> when I
> > > > >> tried
> > > > >>>> to
> > > > >>>>> integrate streamscan (I was using tablescan prevously) into the
> > > > >>> physical
> > > > >>>>> plan generation logic. Because of the way we have written the
> > > > >>>>> OperatorRouter API, we always need a stream-to-relation
> operator
> > at
> > > > >> the
> > > > >>>>> input. But Calcite generates a query plan like following:
> > > > >>>>>
> > > > >>>>> LogicalDelta
> > > > >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
> > > > >>>>>    LogicalFilter(condition=[>($2, 5)])
> > > > >>>>>
> > > > >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> > > > >>>>>
> > > > >>>>> If we consider LogicalFilter as a relation operator, we need
> > > > >> something
> > > > >>> to
> > > > >>>>> convert input stream to a relation before sending the tuples
> > > > >>> downstream.
> > > > >>>>> In addition to this, there is a optimization where we consider
> > > filter
> > > > >>>>> operator as a tuple operator and have it between StreamScan and
> > > > >>>>> stream-to-relation operator as a way of reducing the amount of
> > > > >> messages
> > > > >>>>> going downstream.
> > > > >>>>>
> > > > >>>>> Other scenario is windowed aggregates. Currently window spec is
> > > > >>> attached
> > > > >>>> to
> > > > >>>>> the LogicalProject in query plan like following:
> > > > >>>>>
> > > > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
> > > > >> PRECEDING
> > > > >>>> AND
> > > > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING
> > > AND 2
> > > > >>>>> FOLLOWING)):INTEGER, null)])
> > > > >>>>>
> > > > >>>>> I wanted to know from them whether it is possible to move
> window
> > > > >>>> operation
> > > > >>>>> just after the stream scan, so that it is compatible with our
> > > > >> operator
> > > > >>>>> layer.
> > > > >>>>> May be there are better or easier ways to do this. So your
> > comments
> > > > >> are
> > > > >>>>> always welcome.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Milinda
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> [1]
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> > > > >>>>>
> > > > >>>>> --
> > > > >>>>> Milinda Pathirage
> > > > >>>>>
> > > > >>>>> PhD Student | Research Assistant
> > > > >>>>> School of Informatics and Computing | Data to Insight Center
> > > > >>>>> Indiana University
> > > > >>>>>
> > > > >>>>> twitter: milindalakmal
> > > > >>>>> skype: milinda.pathirage
> > > > >>>>> blog: http://milinda.pathirage.org
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> Milinda Pathirage
> > > > >>>
> > > > >>> PhD Student | Research Assistant
> > > > >>> School of Informatics and Computing | Data to Insight Center
> > > > >>> Indiana University
> > > > >>>
> > > > >>> twitter: milindalakmal
> > > > >>> skype: milinda.pathirage
> > > > >>> blog: http://milinda.pathirage.org
> > > > >>>
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Milinda Pathirage
> > > > >
> > > > > PhD Student | Research Assistant
> > > > > School of Informatics and Computing | Data to Insight Center
> > > > > Indiana University
> > > > >
> > > > > twitter: milindalakmal
> > > > > skype: milinda.pathirage
> > > > > blog: http://milinda.pathirage.org
> > > >
> > > >
> > >
> > >
> > > --
> > > Milinda Pathirage
> > >
> > > PhD Student | Research Assistant
> > > School of Informatics and Computing | Data to Insight Center
> > > Indiana University
> > >
> > > twitter: milindalakmal
> > > skype: milinda.pathirage
> > > blog: http://milinda.pathirage.org
> > >
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Handling defaults and windowed aggregates in stream queries

Posted by Yi Pan <ni...@gmail.com>.
Yeah, I am still thinking about it. Jay pointed out for event-time window,
the window start time may be derivable if we just keep a single starting
value for fixed length windows. I yet to think about the tuple window case
and the windows with dynamic length (i.e. session window example in
MillWheel).

On Fri, Mar 6, 2015 at 7:24 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> I think my previous comment about maintaining start and end offsets as the
> window state will not work when there are delays. We may need to keep
> multiple such offsets. But this may not be a clean solution.
>
> On Thu, Mar 5, 2015 at 2:42 PM, Milinda Pathirage <mp...@umail.iu.edu>
> wrote:
>
> > Hi Yi,
> >
> > Please find my comments inline.
> >
> > On Thu, Mar 5, 2015 at 1:18 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> >> Hi, Milinda,
> >>
> >> We have recently some discussions on the MillWheel model:
> >> http://www.infoq.com/presentations/millwheel.
> >
> >
> > Yes. Above is a very interesting talk. I asked the above question
> > regarding the language, just after watching the talk. I was under the
> > impression that we need to specify these details (handling delays)
> > explicitly in the query.
> >
> >
> >> It is very interesting talk and have one striking point that we did not
> >> think about before: handle late arrivals as a "correction" to the
> earlier
> >> results. Hence, if we follow that model, the late arrival problem that
> you
> >> described can be addressed in the following:
> >>
> >> a) Each window will have a closing policy: it would either be wall-clock
> >> based timeout, or the arrival of messages indicating that we have
> received
> >> all messages in the corresponding event time window
> >>
> >
> > Given that the closing policy is not explicit in the query, how we are
> > going to handle this. Is this policy going to be specific to a query or
> > system wide thing. I think I was not clear about this in the previous
> mail.
> >
> >
> >> b) Each window also keeps all the past messages it receives in the past
> >> windows, up to a large retention size that covers all possible late
> >> arrivals
> >>
> >
> > Are we going to keep this in local storage. Is this (keeping past
> > messages) really necessary in case of monotonic queries. May be you meant
> > to say we just keep metadata about offsets. So we can replay from Kafka
> (I
> > don't have that much experience with Kafka, but I think we can start
> > consuming from random offsets).
> >
> >
> >> c) When a window's closing policy is satisfied, the window operator
> always
> >> emits the current window results
> >>
> >
> > Does this means we are waiting for the window to be closed, before
> sending
> > new messages downstream? This may have performance implications, but this
> > will make it easy to implement the query processing. I think current
> > operator layer can support this style without any changes.
> >
> >
> >> d) When a late arrival message came, the window operator will re-emit
> the
> >> past window results to correct the previous window results
> >>
> >>
> > It would be better if we can do incremental updates without replaying the
> > whole window. But I believe there are advantages of this approach.
> >
> >
> >> In your example, the aggregation for the counter for window from
> >> 10:00-10:59 will have a "wrong" value when the window is closed by an
> >> arrival of message w/ 11:00 timestamp, but will be corrected later by a
> >> late arrival of another message in the time window from 10:00-10:59.
> I.e.
> >> if we keep all the previous window states, late arrival messages will
> >> simply trigger a re-computation of the aggregated counter for the window
> >> 10:00-10:59 and overwrite the previous result. In this model, the final
> >> result is always correct, as long as the late arrivals is within the
> large
> >> retention size.
> >>
> >> I have been thinking of this model and had a discussion with Julian
> >> yesterday. It seems that the followings are more reasonable to me:
> >> 1) Window operator will have a full buffered state of the stream similar
> >> to
> >> a time-varying materialized view over the retention size
> >> 2) Window size and termination (i.e. sliding/tumbling/hopping windows)
> >> will
> >> now determine when we emit window results (i.e. new messages/updates to
> >> the
> >> current window) to the downstream operator s.t. the operators can
> >> calculate
> >> result in time
> >> 3) Late arrivals will be sent to the downstream operator and triggers a
> >> re-computation of the past result based on the full buffered state
> >>
> >> In the above model, the window operator becomes a system feature, or an
> >> implementation of "StreamScan" in Calcite's term. And we do not need
> >> specific language support for the window semantics, with a default time
> >> window operator implementation that serves as a "StreamScan".  All
> window
> >> definition in the query language now only dictates the semantic meaning
> of
> >> aggregation and join on top of the physical window operator which
> >> provides:
> >> a) a varying/growing materialized view; b) a driver that tells the
> >> aggregation/join to compute/re-compute results on-top-of the
> materialized
> >> view.
> >>
> >>
> >>
> > I will think more about this model and may have more questions about this
> > in future :).
> >
> > Thanks
> > Milinda
> >
> >
> >> On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage <
> mpathira@umail.iu.edu
> >> >
> >> wrote:
> >>
> >> > Hi Julian,
> >> >
> >> > I went through the draft and it covers most of our requirements. But
> >> > aggregation over a window will not be as simple as mentioned in the
> >> draft.
> >> >
> >> > In the stream extension draft we have following:
> >> >
> >> > 'How did Calcite know that the 10:00:00 sub-totals were complete at
> >> > > 11:00:00, so that it could emit them? It knows that rowtime is
> >> > increasing,
> >> > > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So,
> once
> >> it
> >> > > has seen a row at or after 11:00:00, it will never see a row that
> will
> >> > > contribute to a 10:00:00 total.'
> >> >
> >> >
> >> > When there are delays, we can't do above. Because observing a row with
> >> > rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to
> >> 10:00:59
> >> > time window will not arrive after this observation. We have discussed
> >> this
> >> > in https://issues.apache.org/jira/browse/SAMZA-552. Even if we
> consider
> >> > the
> >> > 'system time/stream time' as mentioned in SAMZA-552, it doesn't
> >> guarantee
> >> > the absence of delays in a distributed setting. So we may need to
> >> > additional hints/extensions to specify extra information required to
> >> handle
> >> > complexities in window calculations.
> >> >
> >> > May be there are ways to handle this at Samza level, not in the query
> >> > language.
> >> >
> >> > @Chirs, @Yi
> >> > I got the query planner working with some dummy operators and
> re-writing
> >> > the query to add default window operators. But Julian's comments about
> >> > handling defaults and optimizing the query plan (moving the Delta down
> >> and
> >> > removing both Delta and Chi) got me into thinking whether enforcing
> CQL
> >> > semantics as we have in our current operator layer limits the
> >> flexibility
> >> > and increase the complexity of query plan to operator router
> generation.
> >> > Anyway, I am going to take a step back and think more about Julian's
> >> > comments. I'll put my thoughts into a design document for query
> planner.
> >> >
> >> > Thanks
> >> > Milinda
> >> >
> >> >
> >> > On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net>
> >> wrote:
> >> >
> >> > > Sorry to show up late to this party. I've had my head down writing a
> >> > > description of streaming SQL which I hoped would answer questions
> like
> >> > > this. Here is the latest draft:
> >> > >
> >> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
> >> > >
> >> > > I've been avoiding windows for now. They are not needed for simple
> >> > queries
> >> > > (project, filter, windowed aggregate) and I wanted to write the
> >> > > specification of more complex queries before I introduce them.
> >> > >
> >> > > Let's look at a simple query, filter. According to CQL, to evaluate
> >> > >
> >> > >   select stream *
> >> > >   from orders
> >> > >   where productId = 10    (query 1)
> >> > >
> >> > > you need to convert orders to a relation over a particular window,
> >> apply
> >> > > the filter, then convert back to a stream. We could write
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between unbounded
> preceding
> >> > and
> >> > > current row)
> >> > >   where productId = 10    (query 2)
> >> > >
> >> > > or we could write
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between current row and
> >> > current
> >> > > row)
> >> > >   where productId = 10      (query 3)
> >> > >
> >> > > Very different windows, but they produce the same result, because of
> >> the
> >> > > stateless nature of Filter. So, let's suppose that the default
> window
> >> is
> >> > > the one I gave first, "(order by rowtime range between unbounded
> >> > preceding
> >> > > and current row)", and so query 1 is just short-hand for query 2.
> >> > >
> >> > > I currently translate query 1 to
> >> > >
> >> > > Delta
> >> > >   Filter($1 = 10)
> >> > >     Scan(orders)
> >> > >
> >> > > but I should really be translating to
> >> > >
> >> > > Delta
> >> > >   Filter($1 = 10)
> >> > >     Chi(order by $0 range between unbounded preceding and current
> row)
> >> > >       Scan(orders)
> >> > >
> >> > > Delta is the "differentiation" operator and Chi is the "integration"
> >> > > operator. After we apply rules to push the Delta through the Filter,
> >> the
> >> > > Delta and Chi will collide and cancel each other out.
> >> > >
> >> > > Why have I not yet introduced the Chi operator? Because I have not
> yet
> >> > > dealt with a query where it makes any difference.
> >> > >
> >> > > Where it will make a difference is joins. But even for joins, I hold
> >> out
> >> > > hope that we can avoid explicit windows, most of the time. One could
> >> > write
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between current row and
> >> > > interval '1' hour following)
> >> > >   join shipments
> >> > >   on orders.orderId = shipments.orderId    (query 4)
> >> > >
> >> > > but I think most people would find the following clearer:
> >> > >
> >> > >   select stream *
> >> > >   from orders
> >> > >   join shipments
> >> > >   on orders.orderId = shipments.orderId          (query 5)
> >> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> >> > > interval '1' hour
> >> > >
> >> > > Under the covers there are still the implicit windows:
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between unbounded
> preceding
> >> > and
> >> > > current row)
> >> > >   join shipments over (order by rowtime range between unbounded
> >> preceding
> >> > > and current row)
> >> > >   on orders.orderId = shipments.orderId          (query 6)
> >> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> >> > > interval '1' hour
> >> > >
> >> > > Query 6 is equivalent to query 5. But the system can notice the join
> >> > > condition involving the two streams' rowtimes and trim down the
> >> windows
> >> > > (one window to an hour, another window to just the current row)
> >> without
> >> > > changing semantics:
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between interval '1' hour
> >> > > preceding and current row)
> >> > >   join shipments over (order by rowtime range between current row
> and
> >> > > current row)
> >> > >   on orders.orderId = shipments.orderId          (query 7)
> >> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> >> > > interval '1' hour
> >> > >
> >> > > So, my hope is that end-users will rarely need to use an explicit
> >> window.
> >> > >
> >> > > In the algebra, we will start introducing Chi. It will evaporate for
> >> > > simple queries such as Filter. It will remain for more complex
> queries
> >> > such
> >> > > as stream-to-stream join, because you are joining the current row of
> >> one
> >> > > stream to a time-varying relation based on the other, and Chi
> >> represents
> >> > > that "recent history of a stream" relation.
> >> > >
> >> > > Julian
> >> > >
> >> > >
> >> > > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <
> >> mpathira@umail.iu.edu>
> >> > > wrote:
> >> > > >
> >> > > > Hi Yi,
> >> > > >
> >> > > > As I understand rules and re-writes basically do the same thing
> >> > > > (changing/re-writing the operator tree). But in case of rules this
> >> > > happens
> >> > > > during planning based on the query planner configuration. And
> >> > re-writing
> >> > > is
> >> > > > done on the planner output, after the query goes through the
> >> planner.
> >> > In
> >> > > > Calcite re-write is happening inside the interpreter and in our
> >> case it
> >> > > > will be inside the query plan to operator router conversion phase.
> >> > > >
> >> > > > Thanks
> >> > > > Milinda
> >> > > >
> >> > > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com>
> wrote:
> >> > > >
> >> > > >> Hi, Milinda,
> >> > > >>
> >> > > >> +1 on your default window idea. One question: what's the
> difference
> >> > > between
> >> > > >> a rule and a re-write?
> >> > > >>
> >> > > >> Thanks!
> >> > > >>
> >> > > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
> >> > > mpathira@umail.iu.edu>
> >> > > >> wrote:
> >> > > >>
> >> > > >>> @Chris
> >> > > >>> Yes, I was referring to that mail. Actually I was wrong about
> the
> >> > ‘Now’
> >> > > >>> window, it should be a ‘Unbounded’ window for most the default
> >> > > scenarios
> >> > > >>> (Section 6.4 of
> >> https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
> >> > ).
> >> > > >>> Because
> >> > > >>> applying a ‘Now’ window with size of 1 will double the number of
> >> > events
> >> > > >>> generated if we consider insert/delete streams. But ‘Unbounded’
> >> will
> >> > > only
> >> > > >>> generate insert events.
> >> > > >>>
> >> > > >>> @Yi
> >> > > >>> 1. You are correct about Calcite.There is no stream-to-relation
> >> > > >> conversion
> >> > > >>> happening. But as I understand we don’t need Calcite to support
> >> this.
> >> > > We
> >> > > >>> can add it to our query planner as a rule or re-write. What I am
> >> not
> >> > > sure
> >> > > >>> is whether to use a rule or a re-write.
> >> > > >>> 2. There is a rule in Calcite which extract the Window out from
> >> the
> >> > > >>> Project. But I am not sure why that didn’t happen in my test.
> This
> >> > rule
> >> > > >> is
> >> > > >>> added to the planner by default. I’ll ask about this in Calcite
> >> > mailing
> >> > > >>> list.
> >> > > >>>
> >> > > >>> I think we can figure out a way to move the window to the input
> >> > stream
> >> > > if
> >> > > >>> Calcite can move the window out from Project. I’ll see how we
> can
> >> do
> >> > > >> this.
> >> > > >>>
> >> > > >>> Also I’ll go ahead and implement default windows. We can change
> it
> >> > > later
> >> > > >> if
> >> > > >>> Julian or someone from Calcite comes up with a better
> suggestion.
> >> > > >>>
> >> > > >>> Thanks
> >> > > >>> Milinda
> >> > > >>>
> >> > > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com>
> >> wrote:
> >> > > >>>
> >> > > >>>> Hi, Milinda,
> >> > > >>>>
> >> > > >>>> Sorry to reply late on this. Here are some of my comments:
> >> > > >>>> 1) In Calcite's model, it seems that there is no
> >> stream-to-relation
> >> > > >>>> conversion step. In the first example where the window
> >> specification
> >> > > is
> >> > > >>>> missing, I like your solution to add the default
> LogicalNowWindow
> >> > > >>> operator
> >> > > >>>> s.t. it makes the physical operator matches the query plan.
> >> However,
> >> > > if
> >> > > >>>> Calcite community does not agree to add the default
> >> > LogicalNowWindow,
> >> > > >> it
> >> > > >>>> would be fine for us if we always insert a default "now" window
> >> on a
> >> > > >>> stream
> >> > > >>>> when we generate the Samza configuration.
> >> > > >>>> 2) I am more concerned on the other cases, where window
> operator
> >> is
> >> > > >> used
> >> > > >>> in
> >> > > >>>> aggregation and join. In your example of windowed aggregation
> in
> >> > > >> Calcite,
> >> > > >>>> window spec seems to be a decoration to the LogicalProject
> >> operator,
> >> > > >>>> instead of defining a data source to the LogicalProject
> >> operator. In
> >> > > >> the
> >> > > >>>> CQL model we followed, the window operator is considered as a
> >> query
> >> > > >>>> primitive that generate a data source for other relation
> >> operators
> >> > to
> >> > > >>>> consume. How exactly is window operator used in Calcite
> planner?
> >> > Isn't
> >> > > >> it
> >> > > >>>> much clear if the following is used?
> >> > > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0),
> >> > CAST($SUM0($2)):INTEGER,
> >> > > >>>> null)])
> >> > > >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> >> > > >>>>
> >> > > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> >> > > >>> mpathira@umail.iu.edu
> >> > > >>>>>
> >> > > >>>> wrote:
> >> > > >>>>
> >> > > >>>>> Hi devs,
> >> > > >>>>>
> >> > > >>>>> I ask about $subject in calcite-dev. You can find the archived
> >> > > >>> discussion
> >> > > >>>>> at [1]. I think your thoughts are also valuable in this
> >> discussion
> >> > in
> >> > > >>>>> calcite list.
> >> > > >>>>>
> >> > > >>>>> I discovered the requirement for a default window operator
> when
> >> I
> >> > > >> tried
> >> > > >>>> to
> >> > > >>>>> integrate streamscan (I was using tablescan prevously) into
> the
> >> > > >>> physical
> >> > > >>>>> plan generation logic. Because of the way we have written the
> >> > > >>>>> OperatorRouter API, we always need a stream-to-relation
> >> operator at
> >> > > >> the
> >> > > >>>>> input. But Calcite generates a query plan like following:
> >> > > >>>>>
> >> > > >>>>> LogicalDelta
> >> > > >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
> >> > > >>>>>    LogicalFilter(condition=[>($2, 5)])
> >> > > >>>>>
> >> > > >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> >> > > >>>>>
> >> > > >>>>> If we consider LogicalFilter as a relation operator, we need
> >> > > >> something
> >> > > >>> to
> >> > > >>>>> convert input stream to a relation before sending the tuples
> >> > > >>> downstream.
> >> > > >>>>> In addition to this, there is a optimization where we consider
> >> > filter
> >> > > >>>>> operator as a tuple operator and have it between StreamScan
> and
> >> > > >>>>> stream-to-relation operator as a way of reducing the amount of
> >> > > >> messages
> >> > > >>>>> going downstream.
> >> > > >>>>>
> >> > > >>>>> Other scenario is windowed aggregates. Currently window spec
> is
> >> > > >>> attached
> >> > > >>>> to
> >> > > >>>>> the LogicalProject in query plan like following:
> >> > > >>>>>
> >> > > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
> >> > > >> PRECEDING
> >> > > >>>> AND
> >> > > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2
> PRECEDING
> >> > AND 2
> >> > > >>>>> FOLLOWING)):INTEGER, null)])
> >> > > >>>>>
> >> > > >>>>> I wanted to know from them whether it is possible to move
> window
> >> > > >>>> operation
> >> > > >>>>> just after the stream scan, so that it is compatible with our
> >> > > >> operator
> >> > > >>>>> layer.
> >> > > >>>>> May be there are better or easier ways to do this. So your
> >> comments
> >> > > >> are
> >> > > >>>>> always welcome.
> >> > > >>>>>
> >> > > >>>>> Thanks
> >> > > >>>>> Milinda
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> [1]
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> >> > > >>>>>
> >> > > >>>>> --
> >> > > >>>>> Milinda Pathirage
> >> > > >>>>>
> >> > > >>>>> PhD Student | Research Assistant
> >> > > >>>>> School of Informatics and Computing | Data to Insight Center
> >> > > >>>>> Indiana University
> >> > > >>>>>
> >> > > >>>>> twitter: milindalakmal
> >> > > >>>>> skype: milinda.pathirage
> >> > > >>>>> blog: http://milinda.pathirage.org
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>>
> >> > > >>>
> >> > > >>> --
> >> > > >>> Milinda Pathirage
> >> > > >>>
> >> > > >>> PhD Student | Research Assistant
> >> > > >>> School of Informatics and Computing | Data to Insight Center
> >> > > >>> Indiana University
> >> > > >>>
> >> > > >>> twitter: milindalakmal
> >> > > >>> skype: milinda.pathirage
> >> > > >>> blog: http://milinda.pathirage.org
> >> > > >>>
> >> > > >>
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Milinda Pathirage
> >> > > >
> >> > > > PhD Student | Research Assistant
> >> > > > School of Informatics and Computing | Data to Insight Center
> >> > > > Indiana University
> >> > > >
> >> > > > twitter: milindalakmal
> >> > > > skype: milinda.pathirage
> >> > > > blog: http://milinda.pathirage.org
> >> > >
> >> > >
> >> >
> >> >
> >> > --
> >> > Milinda Pathirage
> >> >
> >> > PhD Student | Research Assistant
> >> > School of Informatics and Computing | Data to Insight Center
> >> > Indiana University
> >> >
> >> > twitter: milindalakmal
> >> > skype: milinda.pathirage
> >> > blog: http://milinda.pathirage.org
> >> >
> >>
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Handling defaults and windowed aggregates in stream queries

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
I think my previous comment about maintaining start and end offsets as the
window state will not work when there are delays. We may need to keep
multiple such offsets. But this may not be a clean solution.

On Thu, Mar 5, 2015 at 2:42 PM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi Yi,
>
> Please find my comments inline.
>
> On Thu, Mar 5, 2015 at 1:18 PM, Yi Pan <ni...@gmail.com> wrote:
>
>> Hi, Milinda,
>>
>> We have recently some discussions on the MillWheel model:
>> http://www.infoq.com/presentations/millwheel.
>
>
> Yes. Above is a very interesting talk. I asked the above question
> regarding the language, just after watching the talk. I was under the
> impression that we need to specify these details (handling delays)
> explicitly in the query.
>
>
>> It is very interesting talk and have one striking point that we did not
>> think about before: handle late arrivals as a "correction" to the earlier
>> results. Hence, if we follow that model, the late arrival problem that you
>> described can be addressed in the following:
>>
>> a) Each window will have a closing policy: it would either be wall-clock
>> based timeout, or the arrival of messages indicating that we have received
>> all messages in the corresponding event time window
>>
>
> Given that the closing policy is not explicit in the query, how we are
> going to handle this. Is this policy going to be specific to a query or
> system wide thing. I think I was not clear about this in the previous mail.
>
>
>> b) Each window also keeps all the past messages it receives in the past
>> windows, up to a large retention size that covers all possible late
>> arrivals
>>
>
> Are we going to keep this in local storage. Is this (keeping past
> messages) really necessary in case of monotonic queries. May be you meant
> to say we just keep metadata about offsets. So we can replay from Kafka (I
> don't have that much experience with Kafka, but I think we can start
> consuming from random offsets).
>
>
>> c) When a window's closing policy is satisfied, the window operator always
>> emits the current window results
>>
>
> Does this means we are waiting for the window to be closed, before sending
> new messages downstream? This may have performance implications, but this
> will make it easy to implement the query processing. I think current
> operator layer can support this style without any changes.
>
>
>> d) When a late arrival message came, the window operator will re-emit the
>> past window results to correct the previous window results
>>
>>
> It would be better if we can do incremental updates without replaying the
> whole window. But I believe there are advantages of this approach.
>
>
>> In your example, the aggregation for the counter for window from
>> 10:00-10:59 will have a "wrong" value when the window is closed by an
>> arrival of message w/ 11:00 timestamp, but will be corrected later by a
>> late arrival of another message in the time window from 10:00-10:59. I.e.
>> if we keep all the previous window states, late arrival messages will
>> simply trigger a re-computation of the aggregated counter for the window
>> 10:00-10:59 and overwrite the previous result. In this model, the final
>> result is always correct, as long as the late arrivals is within the large
>> retention size.
>>
>> I have been thinking of this model and had a discussion with Julian
>> yesterday. It seems that the followings are more reasonable to me:
>> 1) Window operator will have a full buffered state of the stream similar
>> to
>> a time-varying materialized view over the retention size
>> 2) Window size and termination (i.e. sliding/tumbling/hopping windows)
>> will
>> now determine when we emit window results (i.e. new messages/updates to
>> the
>> current window) to the downstream operator s.t. the operators can
>> calculate
>> result in time
>> 3) Late arrivals will be sent to the downstream operator and triggers a
>> re-computation of the past result based on the full buffered state
>>
>> In the above model, the window operator becomes a system feature, or an
>> implementation of "StreamScan" in Calcite's term. And we do not need
>> specific language support for the window semantics, with a default time
>> window operator implementation that serves as a "StreamScan".  All window
>> definition in the query language now only dictates the semantic meaning of
>> aggregation and join on top of the physical window operator which
>> provides:
>> a) a varying/growing materialized view; b) a driver that tells the
>> aggregation/join to compute/re-compute results on-top-of the materialized
>> view.
>>
>>
>>
> I will think more about this model and may have more questions about this
> in future :).
>
> Thanks
> Milinda
>
>
>> On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage <mpathira@umail.iu.edu
>> >
>> wrote:
>>
>> > Hi Julian,
>> >
>> > I went through the draft and it covers most of our requirements. But
>> > aggregation over a window will not be as simple as mentioned in the
>> draft.
>> >
>> > In the stream extension draft we have following:
>> >
>> > 'How did Calcite know that the 10:00:00 sub-totals were complete at
>> > > 11:00:00, so that it could emit them? It knows that rowtime is
>> > increasing,
>> > > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, once
>> it
>> > > has seen a row at or after 11:00:00, it will never see a row that will
>> > > contribute to a 10:00:00 total.'
>> >
>> >
>> > When there are delays, we can't do above. Because observing a row with
>> > rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to
>> 10:00:59
>> > time window will not arrive after this observation. We have discussed
>> this
>> > in https://issues.apache.org/jira/browse/SAMZA-552. Even if we consider
>> > the
>> > 'system time/stream time' as mentioned in SAMZA-552, it doesn't
>> guarantee
>> > the absence of delays in a distributed setting. So we may need to
>> > additional hints/extensions to specify extra information required to
>> handle
>> > complexities in window calculations.
>> >
>> > May be there are ways to handle this at Samza level, not in the query
>> > language.
>> >
>> > @Chirs, @Yi
>> > I got the query planner working with some dummy operators and re-writing
>> > the query to add default window operators. But Julian's comments about
>> > handling defaults and optimizing the query plan (moving the Delta down
>> and
>> > removing both Delta and Chi) got me into thinking whether enforcing CQL
>> > semantics as we have in our current operator layer limits the
>> flexibility
>> > and increase the complexity of query plan to operator router generation.
>> > Anyway, I am going to take a step back and think more about Julian's
>> > comments. I'll put my thoughts into a design document for query planner.
>> >
>> > Thanks
>> > Milinda
>> >
>> >
>> > On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net>
>> wrote:
>> >
>> > > Sorry to show up late to this party. I've had my head down writing a
>> > > description of streaming SQL which I hoped would answer questions like
>> > > this. Here is the latest draft:
>> > >
>> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
>> > >
>> > > I've been avoiding windows for now. They are not needed for simple
>> > queries
>> > > (project, filter, windowed aggregate) and I wanted to write the
>> > > specification of more complex queries before I introduce them.
>> > >
>> > > Let's look at a simple query, filter. According to CQL, to evaluate
>> > >
>> > >   select stream *
>> > >   from orders
>> > >   where productId = 10    (query 1)
>> > >
>> > > you need to convert orders to a relation over a particular window,
>> apply
>> > > the filter, then convert back to a stream. We could write
>> > >
>> > >   select stream *
>> > >   from orders over (order by rowtime range between unbounded preceding
>> > and
>> > > current row)
>> > >   where productId = 10    (query 2)
>> > >
>> > > or we could write
>> > >
>> > >   select stream *
>> > >   from orders over (order by rowtime range between current row and
>> > current
>> > > row)
>> > >   where productId = 10      (query 3)
>> > >
>> > > Very different windows, but they produce the same result, because of
>> the
>> > > stateless nature of Filter. So, let's suppose that the default window
>> is
>> > > the one I gave first, "(order by rowtime range between unbounded
>> > preceding
>> > > and current row)", and so query 1 is just short-hand for query 2.
>> > >
>> > > I currently translate query 1 to
>> > >
>> > > Delta
>> > >   Filter($1 = 10)
>> > >     Scan(orders)
>> > >
>> > > but I should really be translating to
>> > >
>> > > Delta
>> > >   Filter($1 = 10)
>> > >     Chi(order by $0 range between unbounded preceding and current row)
>> > >       Scan(orders)
>> > >
>> > > Delta is the "differentiation" operator and Chi is the "integration"
>> > > operator. After we apply rules to push the Delta through the Filter,
>> the
>> > > Delta and Chi will collide and cancel each other out.
>> > >
>> > > Why have I not yet introduced the Chi operator? Because I have not yet
>> > > dealt with a query where it makes any difference.
>> > >
>> > > Where it will make a difference is joins. But even for joins, I hold
>> out
>> > > hope that we can avoid explicit windows, most of the time. One could
>> > write
>> > >
>> > >   select stream *
>> > >   from orders over (order by rowtime range between current row and
>> > > interval '1' hour following)
>> > >   join shipments
>> > >   on orders.orderId = shipments.orderId    (query 4)
>> > >
>> > > but I think most people would find the following clearer:
>> > >
>> > >   select stream *
>> > >   from orders
>> > >   join shipments
>> > >   on orders.orderId = shipments.orderId          (query 5)
>> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
>> > > interval '1' hour
>> > >
>> > > Under the covers there are still the implicit windows:
>> > >
>> > >   select stream *
>> > >   from orders over (order by rowtime range between unbounded preceding
>> > and
>> > > current row)
>> > >   join shipments over (order by rowtime range between unbounded
>> preceding
>> > > and current row)
>> > >   on orders.orderId = shipments.orderId          (query 6)
>> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
>> > > interval '1' hour
>> > >
>> > > Query 6 is equivalent to query 5. But the system can notice the join
>> > > condition involving the two streams' rowtimes and trim down the
>> windows
>> > > (one window to an hour, another window to just the current row)
>> without
>> > > changing semantics:
>> > >
>> > >   select stream *
>> > >   from orders over (order by rowtime range between interval '1' hour
>> > > preceding and current row)
>> > >   join shipments over (order by rowtime range between current row and
>> > > current row)
>> > >   on orders.orderId = shipments.orderId          (query 7)
>> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
>> > > interval '1' hour
>> > >
>> > > So, my hope is that end-users will rarely need to use an explicit
>> window.
>> > >
>> > > In the algebra, we will start introducing Chi. It will evaporate for
>> > > simple queries such as Filter. It will remain for more complex queries
>> > such
>> > > as stream-to-stream join, because you are joining the current row of
>> one
>> > > stream to a time-varying relation based on the other, and Chi
>> represents
>> > > that "recent history of a stream" relation.
>> > >
>> > > Julian
>> > >
>> > >
>> > > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <
>> mpathira@umail.iu.edu>
>> > > wrote:
>> > > >
>> > > > Hi Yi,
>> > > >
>> > > > As I understand rules and re-writes basically do the same thing
>> > > > (changing/re-writing the operator tree). But in case of rules this
>> > > happens
>> > > > during planning based on the query planner configuration. And
>> > re-writing
>> > > is
>> > > > done on the planner output, after the query goes through the
>> planner.
>> > In
>> > > > Calcite re-write is happening inside the interpreter and in our
>> case it
>> > > > will be inside the query plan to operator router conversion phase.
>> > > >
>> > > > Thanks
>> > > > Milinda
>> > > >
>> > > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:
>> > > >
>> > > >> Hi, Milinda,
>> > > >>
>> > > >> +1 on your default window idea. One question: what's the difference
>> > > between
>> > > >> a rule and a re-write?
>> > > >>
>> > > >> Thanks!
>> > > >>
>> > > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
>> > > mpathira@umail.iu.edu>
>> > > >> wrote:
>> > > >>
>> > > >>> @Chris
>> > > >>> Yes, I was referring to that mail. Actually I was wrong about the
>> > ‘Now’
>> > > >>> window, it should be a ‘Unbounded’ window for most the default
>> > > scenarios
>> > > >>> (Section 6.4 of
>> https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
>> > ).
>> > > >>> Because
>> > > >>> applying a ‘Now’ window with size of 1 will double the number of
>> > events
>> > > >>> generated if we consider insert/delete streams. But ‘Unbounded’
>> will
>> > > only
>> > > >>> generate insert events.
>> > > >>>
>> > > >>> @Yi
>> > > >>> 1. You are correct about Calcite.There is no stream-to-relation
>> > > >> conversion
>> > > >>> happening. But as I understand we don’t need Calcite to support
>> this.
>> > > We
>> > > >>> can add it to our query planner as a rule or re-write. What I am
>> not
>> > > sure
>> > > >>> is whether to use a rule or a re-write.
>> > > >>> 2. There is a rule in Calcite which extract the Window out from
>> the
>> > > >>> Project. But I am not sure why that didn’t happen in my test. This
>> > rule
>> > > >> is
>> > > >>> added to the planner by default. I’ll ask about this in Calcite
>> > mailing
>> > > >>> list.
>> > > >>>
>> > > >>> I think we can figure out a way to move the window to the input
>> > stream
>> > > if
>> > > >>> Calcite can move the window out from Project. I’ll see how we can
>> do
>> > > >> this.
>> > > >>>
>> > > >>> Also I’ll go ahead and implement default windows. We can change it
>> > > later
>> > > >> if
>> > > >>> Julian or someone from Calcite comes up with a better suggestion.
>> > > >>>
>> > > >>> Thanks
>> > > >>> Milinda
>> > > >>>
>> > > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com>
>> wrote:
>> > > >>>
>> > > >>>> Hi, Milinda,
>> > > >>>>
>> > > >>>> Sorry to reply late on this. Here are some of my comments:
>> > > >>>> 1) In Calcite's model, it seems that there is no
>> stream-to-relation
>> > > >>>> conversion step. In the first example where the window
>> specification
>> > > is
>> > > >>>> missing, I like your solution to add the default LogicalNowWindow
>> > > >>> operator
>> > > >>>> s.t. it makes the physical operator matches the query plan.
>> However,
>> > > if
>> > > >>>> Calcite community does not agree to add the default
>> > LogicalNowWindow,
>> > > >> it
>> > > >>>> would be fine for us if we always insert a default "now" window
>> on a
>> > > >>> stream
>> > > >>>> when we generate the Samza configuration.
>> > > >>>> 2) I am more concerned on the other cases, where window operator
>> is
>> > > >> used
>> > > >>> in
>> > > >>>> aggregation and join. In your example of windowed aggregation in
>> > > >> Calcite,
>> > > >>>> window spec seems to be a decoration to the LogicalProject
>> operator,
>> > > >>>> instead of defining a data source to the LogicalProject
>> operator. In
>> > > >> the
>> > > >>>> CQL model we followed, the window operator is considered as a
>> query
>> > > >>>> primitive that generate a data source for other relation
>> operators
>> > to
>> > > >>>> consume. How exactly is window operator used in Calcite planner?
>> > Isn't
>> > > >> it
>> > > >>>> much clear if the following is used?
>> > > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0),
>> > CAST($SUM0($2)):INTEGER,
>> > > >>>> null)])
>> > > >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
>> > > >>>>
>> > > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
>> > > >>> mpathira@umail.iu.edu
>> > > >>>>>
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>>> Hi devs,
>> > > >>>>>
>> > > >>>>> I ask about $subject in calcite-dev. You can find the archived
>> > > >>> discussion
>> > > >>>>> at [1]. I think your thoughts are also valuable in this
>> discussion
>> > in
>> > > >>>>> calcite list.
>> > > >>>>>
>> > > >>>>> I discovered the requirement for a default window operator when
>> I
>> > > >> tried
>> > > >>>> to
>> > > >>>>> integrate streamscan (I was using tablescan prevously) into the
>> > > >>> physical
>> > > >>>>> plan generation logic. Because of the way we have written the
>> > > >>>>> OperatorRouter API, we always need a stream-to-relation
>> operator at
>> > > >> the
>> > > >>>>> input. But Calcite generates a query plan like following:
>> > > >>>>>
>> > > >>>>> LogicalDelta
>> > > >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
>> > > >>>>>    LogicalFilter(condition=[>($2, 5)])
>> > > >>>>>
>> > > >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>> > > >>>>>
>> > > >>>>> If we consider LogicalFilter as a relation operator, we need
>> > > >> something
>> > > >>> to
>> > > >>>>> convert input stream to a relation before sending the tuples
>> > > >>> downstream.
>> > > >>>>> In addition to this, there is a optimization where we consider
>> > filter
>> > > >>>>> operator as a tuple operator and have it between StreamScan and
>> > > >>>>> stream-to-relation operator as a way of reducing the amount of
>> > > >> messages
>> > > >>>>> going downstream.
>> > > >>>>>
>> > > >>>>> Other scenario is windowed aggregates. Currently window spec is
>> > > >>> attached
>> > > >>>> to
>> > > >>>>> the LogicalProject in query plan like following:
>> > > >>>>>
>> > > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
>> > > >> PRECEDING
>> > > >>>> AND
>> > > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING
>> > AND 2
>> > > >>>>> FOLLOWING)):INTEGER, null)])
>> > > >>>>>
>> > > >>>>> I wanted to know from them whether it is possible to move window
>> > > >>>> operation
>> > > >>>>> just after the stream scan, so that it is compatible with our
>> > > >> operator
>> > > >>>>> layer.
>> > > >>>>> May be there are better or easier ways to do this. So your
>> comments
>> > > >> are
>> > > >>>>> always welcome.
>> > > >>>>>
>> > > >>>>> Thanks
>> > > >>>>> Milinda
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> [1]
>> > > >>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>> > > >>>>>
>> > > >>>>> --
>> > > >>>>> Milinda Pathirage
>> > > >>>>>
>> > > >>>>> PhD Student | Research Assistant
>> > > >>>>> School of Informatics and Computing | Data to Insight Center
>> > > >>>>> Indiana University
>> > > >>>>>
>> > > >>>>> twitter: milindalakmal
>> > > >>>>> skype: milinda.pathirage
>> > > >>>>> blog: http://milinda.pathirage.org
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>>
>> > > >>>
>> > > >>> --
>> > > >>> Milinda Pathirage
>> > > >>>
>> > > >>> PhD Student | Research Assistant
>> > > >>> School of Informatics and Computing | Data to Insight Center
>> > > >>> Indiana University
>> > > >>>
>> > > >>> twitter: milindalakmal
>> > > >>> skype: milinda.pathirage
>> > > >>> blog: http://milinda.pathirage.org
>> > > >>>
>> > > >>
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Milinda Pathirage
>> > > >
>> > > > PhD Student | Research Assistant
>> > > > School of Informatics and Computing | Data to Insight Center
>> > > > Indiana University
>> > > >
>> > > > twitter: milindalakmal
>> > > > skype: milinda.pathirage
>> > > > blog: http://milinda.pathirage.org
>> > >
>> > >
>> >
>> >
>> > --
>> > Milinda Pathirage
>> >
>> > PhD Student | Research Assistant
>> > School of Informatics and Computing | Data to Insight Center
>> > Indiana University
>> >
>> > twitter: milindalakmal
>> > skype: milinda.pathirage
>> > blog: http://milinda.pathirage.org
>> >
>>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Handling defaults and windowed aggregates in stream queries

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Yi,

Please find my comments inline.

On Thu, Mar 5, 2015 at 1:18 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Milinda,
>
> We have recently some discussions on the MillWheel model:
> http://www.infoq.com/presentations/millwheel.


Yes. Above is a very interesting talk. I asked the above question regarding
the language, just after watching the talk. I was under the impression that
we need to specify these details (handling delays) explicitly in the query.


> It is very interesting talk and have one striking point that we did not
> think about before: handle late arrivals as a "correction" to the earlier
> results. Hence, if we follow that model, the late arrival problem that you
> described can be addressed in the following:
>
> a) Each window will have a closing policy: it would either be wall-clock
> based timeout, or the arrival of messages indicating that we have received
> all messages in the corresponding event time window
>

Given that the closing policy is not explicit in the query, how we are
going to handle this. Is this policy going to be specific to a query or
system wide thing. I think I was not clear about this in the previous mail.


> b) Each window also keeps all the past messages it receives in the past
> windows, up to a large retention size that covers all possible late
> arrivals
>

Are we going to keep this in local storage. Is this (keeping past messages)
really necessary in case of monotonic queries. May be you meant to say we
just keep metadata about offsets. So we can replay from Kafka (I don't have
that much experience with Kafka, but I think we can start consuming from
random offsets).


> c) When a window's closing policy is satisfied, the window operator always
> emits the current window results
>

Does this means we are waiting for the window to be closed, before sending
new messages downstream? This may have performance implications, but this
will make it easy to implement the query processing. I think current
operator layer can support this style without any changes.


> d) When a late arrival message came, the window operator will re-emit the
> past window results to correct the previous window results
>
>
It would be better if we can do incremental updates without replaying the
whole window. But I believe there are advantages of this approach.


> In your example, the aggregation for the counter for window from
> 10:00-10:59 will have a "wrong" value when the window is closed by an
> arrival of message w/ 11:00 timestamp, but will be corrected later by a
> late arrival of another message in the time window from 10:00-10:59. I.e.
> if we keep all the previous window states, late arrival messages will
> simply trigger a re-computation of the aggregated counter for the window
> 10:00-10:59 and overwrite the previous result. In this model, the final
> result is always correct, as long as the late arrivals is within the large
> retention size.
>
> I have been thinking of this model and had a discussion with Julian
> yesterday. It seems that the followings are more reasonable to me:
> 1) Window operator will have a full buffered state of the stream similar to
> a time-varying materialized view over the retention size
> 2) Window size and termination (i.e. sliding/tumbling/hopping windows) will
> now determine when we emit window results (i.e. new messages/updates to the
> current window) to the downstream operator s.t. the operators can calculate
> result in time
> 3) Late arrivals will be sent to the downstream operator and triggers a
> re-computation of the past result based on the full buffered state
>
> In the above model, the window operator becomes a system feature, or an
> implementation of "StreamScan" in Calcite's term. And we do not need
> specific language support for the window semantics, with a default time
> window operator implementation that serves as a "StreamScan".  All window
> definition in the query language now only dictates the semantic meaning of
> aggregation and join on top of the physical window operator which provides:
> a) a varying/growing materialized view; b) a driver that tells the
> aggregation/join to compute/re-compute results on-top-of the materialized
> view.
>
>
>
I will think more about this model and may have more questions about this
in future :).

Thanks
Milinda


> On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage <mp...@umail.iu.edu>
> wrote:
>
> > Hi Julian,
> >
> > I went through the draft and it covers most of our requirements. But
> > aggregation over a window will not be as simple as mentioned in the
> draft.
> >
> > In the stream extension draft we have following:
> >
> > 'How did Calcite know that the 10:00:00 sub-totals were complete at
> > > 11:00:00, so that it could emit them? It knows that rowtime is
> > increasing,
> > > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, once
> it
> > > has seen a row at or after 11:00:00, it will never see a row that will
> > > contribute to a 10:00:00 total.'
> >
> >
> > When there are delays, we can't do above. Because observing a row with
> > rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to
> 10:00:59
> > time window will not arrive after this observation. We have discussed
> this
> > in https://issues.apache.org/jira/browse/SAMZA-552. Even if we consider
> > the
> > 'system time/stream time' as mentioned in SAMZA-552, it doesn't guarantee
> > the absence of delays in a distributed setting. So we may need to
> > additional hints/extensions to specify extra information required to
> handle
> > complexities in window calculations.
> >
> > May be there are ways to handle this at Samza level, not in the query
> > language.
> >
> > @Chirs, @Yi
> > I got the query planner working with some dummy operators and re-writing
> > the query to add default window operators. But Julian's comments about
> > handling defaults and optimizing the query plan (moving the Delta down
> and
> > removing both Delta and Chi) got me into thinking whether enforcing CQL
> > semantics as we have in our current operator layer limits the flexibility
> > and increase the complexity of query plan to operator router generation.
> > Anyway, I am going to take a step back and think more about Julian's
> > comments. I'll put my thoughts into a design document for query planner.
> >
> > Thanks
> > Milinda
> >
> >
> > On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net>
> wrote:
> >
> > > Sorry to show up late to this party. I've had my head down writing a
> > > description of streaming SQL which I hoped would answer questions like
> > > this. Here is the latest draft:
> > > https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
> > >
> > > I've been avoiding windows for now. They are not needed for simple
> > queries
> > > (project, filter, windowed aggregate) and I wanted to write the
> > > specification of more complex queries before I introduce them.
> > >
> > > Let's look at a simple query, filter. According to CQL, to evaluate
> > >
> > >   select stream *
> > >   from orders
> > >   where productId = 10    (query 1)
> > >
> > > you need to convert orders to a relation over a particular window,
> apply
> > > the filter, then convert back to a stream. We could write
> > >
> > >   select stream *
> > >   from orders over (order by rowtime range between unbounded preceding
> > and
> > > current row)
> > >   where productId = 10    (query 2)
> > >
> > > or we could write
> > >
> > >   select stream *
> > >   from orders over (order by rowtime range between current row and
> > current
> > > row)
> > >   where productId = 10      (query 3)
> > >
> > > Very different windows, but they produce the same result, because of
> the
> > > stateless nature of Filter. So, let's suppose that the default window
> is
> > > the one I gave first, "(order by rowtime range between unbounded
> > preceding
> > > and current row)", and so query 1 is just short-hand for query 2.
> > >
> > > I currently translate query 1 to
> > >
> > > Delta
> > >   Filter($1 = 10)
> > >     Scan(orders)
> > >
> > > but I should really be translating to
> > >
> > > Delta
> > >   Filter($1 = 10)
> > >     Chi(order by $0 range between unbounded preceding and current row)
> > >       Scan(orders)
> > >
> > > Delta is the "differentiation" operator and Chi is the "integration"
> > > operator. After we apply rules to push the Delta through the Filter,
> the
> > > Delta and Chi will collide and cancel each other out.
> > >
> > > Why have I not yet introduced the Chi operator? Because I have not yet
> > > dealt with a query where it makes any difference.
> > >
> > > Where it will make a difference is joins. But even for joins, I hold
> out
> > > hope that we can avoid explicit windows, most of the time. One could
> > write
> > >
> > >   select stream *
> > >   from orders over (order by rowtime range between current row and
> > > interval '1' hour following)
> > >   join shipments
> > >   on orders.orderId = shipments.orderId    (query 4)
> > >
> > > but I think most people would find the following clearer:
> > >
> > >   select stream *
> > >   from orders
> > >   join shipments
> > >   on orders.orderId = shipments.orderId          (query 5)
> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > > interval '1' hour
> > >
> > > Under the covers there are still the implicit windows:
> > >
> > >   select stream *
> > >   from orders over (order by rowtime range between unbounded preceding
> > and
> > > current row)
> > >   join shipments over (order by rowtime range between unbounded
> preceding
> > > and current row)
> > >   on orders.orderId = shipments.orderId          (query 6)
> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > > interval '1' hour
> > >
> > > Query 6 is equivalent to query 5. But the system can notice the join
> > > condition involving the two streams' rowtimes and trim down the windows
> > > (one window to an hour, another window to just the current row) without
> > > changing semantics:
> > >
> > >   select stream *
> > >   from orders over (order by rowtime range between interval '1' hour
> > > preceding and current row)
> > >   join shipments over (order by rowtime range between current row and
> > > current row)
> > >   on orders.orderId = shipments.orderId          (query 7)
> > >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > > interval '1' hour
> > >
> > > So, my hope is that end-users will rarely need to use an explicit
> window.
> > >
> > > In the algebra, we will start introducing Chi. It will evaporate for
> > > simple queries such as Filter. It will remain for more complex queries
> > such
> > > as stream-to-stream join, because you are joining the current row of
> one
> > > stream to a time-varying relation based on the other, and Chi
> represents
> > > that "recent history of a stream" relation.
> > >
> > > Julian
> > >
> > >
> > > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <
> mpathira@umail.iu.edu>
> > > wrote:
> > > >
> > > > Hi Yi,
> > > >
> > > > As I understand rules and re-writes basically do the same thing
> > > > (changing/re-writing the operator tree). But in case of rules this
> > > happens
> > > > during planning based on the query planner configuration. And
> > re-writing
> > > is
> > > > done on the planner output, after the query goes through the planner.
> > In
> > > > Calcite re-write is happening inside the interpreter and in our case
> it
> > > > will be inside the query plan to operator router conversion phase.
> > > >
> > > > Thanks
> > > > Milinda
> > > >
> > > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:
> > > >
> > > >> Hi, Milinda,
> > > >>
> > > >> +1 on your default window idea. One question: what's the difference
> > > between
> > > >> a rule and a re-write?
> > > >>
> > > >> Thanks!
> > > >>
> > > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
> > > mpathira@umail.iu.edu>
> > > >> wrote:
> > > >>
> > > >>> @Chris
> > > >>> Yes, I was referring to that mail. Actually I was wrong about the
> > ‘Now’
> > > >>> window, it should be a ‘Unbounded’ window for most the default
> > > scenarios
> > > >>> (Section 6.4 of
> https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
> > ).
> > > >>> Because
> > > >>> applying a ‘Now’ window with size of 1 will double the number of
> > events
> > > >>> generated if we consider insert/delete streams. But ‘Unbounded’
> will
> > > only
> > > >>> generate insert events.
> > > >>>
> > > >>> @Yi
> > > >>> 1. You are correct about Calcite.There is no stream-to-relation
> > > >> conversion
> > > >>> happening. But as I understand we don’t need Calcite to support
> this.
> > > We
> > > >>> can add it to our query planner as a rule or re-write. What I am
> not
> > > sure
> > > >>> is whether to use a rule or a re-write.
> > > >>> 2. There is a rule in Calcite which extract the Window out from the
> > > >>> Project. But I am not sure why that didn’t happen in my test. This
> > rule
> > > >> is
> > > >>> added to the planner by default. I’ll ask about this in Calcite
> > mailing
> > > >>> list.
> > > >>>
> > > >>> I think we can figure out a way to move the window to the input
> > stream
> > > if
> > > >>> Calcite can move the window out from Project. I’ll see how we can
> do
> > > >> this.
> > > >>>
> > > >>> Also I’ll go ahead and implement default windows. We can change it
> > > later
> > > >> if
> > > >>> Julian or someone from Calcite comes up with a better suggestion.
> > > >>>
> > > >>> Thanks
> > > >>> Milinda
> > > >>>
> > > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com>
> wrote:
> > > >>>
> > > >>>> Hi, Milinda,
> > > >>>>
> > > >>>> Sorry to reply late on this. Here are some of my comments:
> > > >>>> 1) In Calcite's model, it seems that there is no
> stream-to-relation
> > > >>>> conversion step. In the first example where the window
> specification
> > > is
> > > >>>> missing, I like your solution to add the default LogicalNowWindow
> > > >>> operator
> > > >>>> s.t. it makes the physical operator matches the query plan.
> However,
> > > if
> > > >>>> Calcite community does not agree to add the default
> > LogicalNowWindow,
> > > >> it
> > > >>>> would be fine for us if we always insert a default "now" window
> on a
> > > >>> stream
> > > >>>> when we generate the Samza configuration.
> > > >>>> 2) I am more concerned on the other cases, where window operator
> is
> > > >> used
> > > >>> in
> > > >>>> aggregation and join. In your example of windowed aggregation in
> > > >> Calcite,
> > > >>>> window spec seems to be a decoration to the LogicalProject
> operator,
> > > >>>> instead of defining a data source to the LogicalProject operator.
> In
> > > >> the
> > > >>>> CQL model we followed, the window operator is considered as a
> query
> > > >>>> primitive that generate a data source for other relation operators
> > to
> > > >>>> consume. How exactly is window operator used in Calcite planner?
> > Isn't
> > > >> it
> > > >>>> much clear if the following is used?
> > > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0),
> > CAST($SUM0($2)):INTEGER,
> > > >>>> null)])
> > > >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> > > >>>>
> > > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> > > >>> mpathira@umail.iu.edu
> > > >>>>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi devs,
> > > >>>>>
> > > >>>>> I ask about $subject in calcite-dev. You can find the archived
> > > >>> discussion
> > > >>>>> at [1]. I think your thoughts are also valuable in this
> discussion
> > in
> > > >>>>> calcite list.
> > > >>>>>
> > > >>>>> I discovered the requirement for a default window operator when I
> > > >> tried
> > > >>>> to
> > > >>>>> integrate streamscan (I was using tablescan prevously) into the
> > > >>> physical
> > > >>>>> plan generation logic. Because of the way we have written the
> > > >>>>> OperatorRouter API, we always need a stream-to-relation operator
> at
> > > >> the
> > > >>>>> input. But Calcite generates a query plan like following:
> > > >>>>>
> > > >>>>> LogicalDelta
> > > >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
> > > >>>>>    LogicalFilter(condition=[>($2, 5)])
> > > >>>>>
> > > >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> > > >>>>>
> > > >>>>> If we consider LogicalFilter as a relation operator, we need
> > > >> something
> > > >>> to
> > > >>>>> convert input stream to a relation before sending the tuples
> > > >>> downstream.
> > > >>>>> In addition to this, there is a optimization where we consider
> > filter
> > > >>>>> operator as a tuple operator and have it between StreamScan and
> > > >>>>> stream-to-relation operator as a way of reducing the amount of
> > > >> messages
> > > >>>>> going downstream.
> > > >>>>>
> > > >>>>> Other scenario is windowed aggregates. Currently window spec is
> > > >>> attached
> > > >>>> to
> > > >>>>> the LogicalProject in query plan like following:
> > > >>>>>
> > > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
> > > >> PRECEDING
> > > >>>> AND
> > > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING
> > AND 2
> > > >>>>> FOLLOWING)):INTEGER, null)])
> > > >>>>>
> > > >>>>> I wanted to know from them whether it is possible to move window
> > > >>>> operation
> > > >>>>> just after the stream scan, so that it is compatible with our
> > > >> operator
> > > >>>>> layer.
> > > >>>>> May be there are better or easier ways to do this. So your
> comments
> > > >> are
> > > >>>>> always welcome.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Milinda
> > > >>>>>
> > > >>>>>
> > > >>>>> [1]
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> > > >>>>>
> > > >>>>> --
> > > >>>>> Milinda Pathirage
> > > >>>>>
> > > >>>>> PhD Student | Research Assistant
> > > >>>>> School of Informatics and Computing | Data to Insight Center
> > > >>>>> Indiana University
> > > >>>>>
> > > >>>>> twitter: milindalakmal
> > > >>>>> skype: milinda.pathirage
> > > >>>>> blog: http://milinda.pathirage.org
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Milinda Pathirage
> > > >>>
> > > >>> PhD Student | Research Assistant
> > > >>> School of Informatics and Computing | Data to Insight Center
> > > >>> Indiana University
> > > >>>
> > > >>> twitter: milindalakmal
> > > >>> skype: milinda.pathirage
> > > >>> blog: http://milinda.pathirage.org
> > > >>>
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Milinda Pathirage
> > > >
> > > > PhD Student | Research Assistant
> > > > School of Informatics and Computing | Data to Insight Center
> > > > Indiana University
> > > >
> > > > twitter: milindalakmal
> > > > skype: milinda.pathirage
> > > > blog: http://milinda.pathirage.org
> > >
> > >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Handling defaults and windowed aggregates in stream queries

Posted by Yi Pan <ni...@gmail.com>.
Hi, Milinda,

We have recently some discussions on the MillWheel model:
http://www.infoq.com/presentations/millwheel.

It is very interesting talk and have one striking point that we did not
think about before: handle late arrivals as a "correction" to the earlier
results. Hence, if we follow that model, the late arrival problem that you
described can be addressed in the following:

a) Each window will have a closing policy: it would either be wall-clock
based timeout, or the arrival of messages indicating that we have received
all messages in the corresponding event time window
b) Each window also keeps all the past messages it receives in the past
windows, up to a large retention size that covers all possible late arrivals
c) When a window's closing policy is satisfied, the window operator always
emits the current window results
d) When a late arrival message came, the window operator will re-emit the
past window results to correct the previous window results

In your example, the aggregation for the counter for window from
10:00-10:59 will have a "wrong" value when the window is closed by an
arrival of message w/ 11:00 timestamp, but will be corrected later by a
late arrival of another message in the time window from 10:00-10:59. I.e.
if we keep all the previous window states, late arrival messages will
simply trigger a re-computation of the aggregated counter for the window
10:00-10:59 and overwrite the previous result. In this model, the final
result is always correct, as long as the late arrivals is within the large
retention size.

I have been thinking of this model and had a discussion with Julian
yesterday. It seems that the followings are more reasonable to me:
1) Window operator will have a full buffered state of the stream similar to
a time-varying materialized view over the retention size
2) Window size and termination (i.e. sliding/tumbling/hopping windows) will
now determine when we emit window results (i.e. new messages/updates to the
current window) to the downstream operator s.t. the operators can calculate
result in time
3) Late arrivals will be sent to the downstream operator and triggers a
re-computation of the past result based on the full buffered state

In the above model, the window operator becomes a system feature, or an
implementation of "StreamScan" in Calcite's term. And we do not need
specific language support for the window semantics, with a default time
window operator implementation that serves as a "StreamScan".  All window
definition in the query language now only dictates the semantic meaning of
aggregation and join on top of the physical window operator which provides:
a) a varying/growing materialized view; b) a driver that tells the
aggregation/join to compute/re-compute results on-top-of the materialized
view.


On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi Julian,
>
> I went through the draft and it covers most of our requirements. But
> aggregation over a window will not be as simple as mentioned in the draft.
>
> In the stream extension draft we have following:
>
> 'How did Calcite know that the 10:00:00 sub-totals were complete at
> > 11:00:00, so that it could emit them? It knows that rowtime is
> increasing,
> > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, once it
> > has seen a row at or after 11:00:00, it will never see a row that will
> > contribute to a 10:00:00 total.'
>
>
> When there are delays, we can't do above. Because observing a row with
> rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to 10:00:59
> time window will not arrive after this observation. We have discussed this
> in https://issues.apache.org/jira/browse/SAMZA-552. Even if we consider
> the
> 'system time/stream time' as mentioned in SAMZA-552, it doesn't guarantee
> the absence of delays in a distributed setting. So we may need to
> additional hints/extensions to specify extra information required to handle
> complexities in window calculations.
>
> May be there are ways to handle this at Samza level, not in the query
> language.
>
> @Chirs, @Yi
> I got the query planner working with some dummy operators and re-writing
> the query to add default window operators. But Julian's comments about
> handling defaults and optimizing the query plan (moving the Delta down and
> removing both Delta and Chi) got me into thinking whether enforcing CQL
> semantics as we have in our current operator layer limits the flexibility
> and increase the complexity of query plan to operator router generation.
> Anyway, I am going to take a step back and think more about Julian's
> comments. I'll put my thoughts into a design document for query planner.
>
> Thanks
> Milinda
>
>
> On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net> wrote:
>
> > Sorry to show up late to this party. I've had my head down writing a
> > description of streaming SQL which I hoped would answer questions like
> > this. Here is the latest draft:
> > https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
> >
> > I've been avoiding windows for now. They are not needed for simple
> queries
> > (project, filter, windowed aggregate) and I wanted to write the
> > specification of more complex queries before I introduce them.
> >
> > Let's look at a simple query, filter. According to CQL, to evaluate
> >
> >   select stream *
> >   from orders
> >   where productId = 10    (query 1)
> >
> > you need to convert orders to a relation over a particular window, apply
> > the filter, then convert back to a stream. We could write
> >
> >   select stream *
> >   from orders over (order by rowtime range between unbounded preceding
> and
> > current row)
> >   where productId = 10    (query 2)
> >
> > or we could write
> >
> >   select stream *
> >   from orders over (order by rowtime range between current row and
> current
> > row)
> >   where productId = 10      (query 3)
> >
> > Very different windows, but they produce the same result, because of the
> > stateless nature of Filter. So, let's suppose that the default window is
> > the one I gave first, "(order by rowtime range between unbounded
> preceding
> > and current row)", and so query 1 is just short-hand for query 2.
> >
> > I currently translate query 1 to
> >
> > Delta
> >   Filter($1 = 10)
> >     Scan(orders)
> >
> > but I should really be translating to
> >
> > Delta
> >   Filter($1 = 10)
> >     Chi(order by $0 range between unbounded preceding and current row)
> >       Scan(orders)
> >
> > Delta is the "differentiation" operator and Chi is the "integration"
> > operator. After we apply rules to push the Delta through the Filter, the
> > Delta and Chi will collide and cancel each other out.
> >
> > Why have I not yet introduced the Chi operator? Because I have not yet
> > dealt with a query where it makes any difference.
> >
> > Where it will make a difference is joins. But even for joins, I hold out
> > hope that we can avoid explicit windows, most of the time. One could
> write
> >
> >   select stream *
> >   from orders over (order by rowtime range between current row and
> > interval '1' hour following)
> >   join shipments
> >   on orders.orderId = shipments.orderId    (query 4)
> >
> > but I think most people would find the following clearer:
> >
> >   select stream *
> >   from orders
> >   join shipments
> >   on orders.orderId = shipments.orderId          (query 5)
> >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > interval '1' hour
> >
> > Under the covers there are still the implicit windows:
> >
> >   select stream *
> >   from orders over (order by rowtime range between unbounded preceding
> and
> > current row)
> >   join shipments over (order by rowtime range between unbounded preceding
> > and current row)
> >   on orders.orderId = shipments.orderId          (query 6)
> >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > interval '1' hour
> >
> > Query 6 is equivalent to query 5. But the system can notice the join
> > condition involving the two streams' rowtimes and trim down the windows
> > (one window to an hour, another window to just the current row) without
> > changing semantics:
> >
> >   select stream *
> >   from orders over (order by rowtime range between interval '1' hour
> > preceding and current row)
> >   join shipments over (order by rowtime range between current row and
> > current row)
> >   on orders.orderId = shipments.orderId          (query 7)
> >   and shipments.rowtime between orders.rowtime and orders.rowtime +
> > interval '1' hour
> >
> > So, my hope is that end-users will rarely need to use an explicit window.
> >
> > In the algebra, we will start introducing Chi. It will evaporate for
> > simple queries such as Filter. It will remain for more complex queries
> such
> > as stream-to-stream join, because you are joining the current row of one
> > stream to a time-varying relation based on the other, and Chi represents
> > that "recent history of a stream" relation.
> >
> > Julian
> >
> >
> > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <mp...@umail.iu.edu>
> > wrote:
> > >
> > > Hi Yi,
> > >
> > > As I understand rules and re-writes basically do the same thing
> > > (changing/re-writing the operator tree). But in case of rules this
> > happens
> > > during planning based on the query planner configuration. And
> re-writing
> > is
> > > done on the planner output, after the query goes through the planner.
> In
> > > Calcite re-write is happening inside the interpreter and in our case it
> > > will be inside the query plan to operator router conversion phase.
> > >
> > > Thanks
> > > Milinda
> > >
> > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:
> > >
> > >> Hi, Milinda,
> > >>
> > >> +1 on your default window idea. One question: what's the difference
> > between
> > >> a rule and a re-write?
> > >>
> > >> Thanks!
> > >>
> > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
> > mpathira@umail.iu.edu>
> > >> wrote:
> > >>
> > >>> @Chris
> > >>> Yes, I was referring to that mail. Actually I was wrong about the
> ‘Now’
> > >>> window, it should be a ‘Unbounded’ window for most the default
> > scenarios
> > >>> (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
> ).
> > >>> Because
> > >>> applying a ‘Now’ window with size of 1 will double the number of
> events
> > >>> generated if we consider insert/delete streams. But ‘Unbounded’ will
> > only
> > >>> generate insert events.
> > >>>
> > >>> @Yi
> > >>> 1. You are correct about Calcite.There is no stream-to-relation
> > >> conversion
> > >>> happening. But as I understand we don’t need Calcite to support this.
> > We
> > >>> can add it to our query planner as a rule or re-write. What I am not
> > sure
> > >>> is whether to use a rule or a re-write.
> > >>> 2. There is a rule in Calcite which extract the Window out from the
> > >>> Project. But I am not sure why that didn’t happen in my test. This
> rule
> > >> is
> > >>> added to the planner by default. I’ll ask about this in Calcite
> mailing
> > >>> list.
> > >>>
> > >>> I think we can figure out a way to move the window to the input
> stream
> > if
> > >>> Calcite can move the window out from Project. I’ll see how we can do
> > >> this.
> > >>>
> > >>> Also I’ll go ahead and implement default windows. We can change it
> > later
> > >> if
> > >>> Julian or someone from Calcite comes up with a better suggestion.
> > >>>
> > >>> Thanks
> > >>> Milinda
> > >>>
> > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:
> > >>>
> > >>>> Hi, Milinda,
> > >>>>
> > >>>> Sorry to reply late on this. Here are some of my comments:
> > >>>> 1) In Calcite's model, it seems that there is no stream-to-relation
> > >>>> conversion step. In the first example where the window specification
> > is
> > >>>> missing, I like your solution to add the default LogicalNowWindow
> > >>> operator
> > >>>> s.t. it makes the physical operator matches the query plan. However,
> > if
> > >>>> Calcite community does not agree to add the default
> LogicalNowWindow,
> > >> it
> > >>>> would be fine for us if we always insert a default "now" window on a
> > >>> stream
> > >>>> when we generate the Samza configuration.
> > >>>> 2) I am more concerned on the other cases, where window operator is
> > >> used
> > >>> in
> > >>>> aggregation and join. In your example of windowed aggregation in
> > >> Calcite,
> > >>>> window spec seems to be a decoration to the LogicalProject operator,
> > >>>> instead of defining a data source to the LogicalProject operator. In
> > >> the
> > >>>> CQL model we followed, the window operator is considered as a query
> > >>>> primitive that generate a data source for other relation operators
> to
> > >>>> consume. How exactly is window operator used in Calcite planner?
> Isn't
> > >> it
> > >>>> much clear if the following is used?
> > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0),
> CAST($SUM0($2)):INTEGER,
> > >>>> null)])
> > >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> > >>>>
> > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> > >>> mpathira@umail.iu.edu
> > >>>>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi devs,
> > >>>>>
> > >>>>> I ask about $subject in calcite-dev. You can find the archived
> > >>> discussion
> > >>>>> at [1]. I think your thoughts are also valuable in this discussion
> in
> > >>>>> calcite list.
> > >>>>>
> > >>>>> I discovered the requirement for a default window operator when I
> > >> tried
> > >>>> to
> > >>>>> integrate streamscan (I was using tablescan prevously) into the
> > >>> physical
> > >>>>> plan generation logic. Because of the way we have written the
> > >>>>> OperatorRouter API, we always need a stream-to-relation operator at
> > >> the
> > >>>>> input. But Calcite generates a query plan like following:
> > >>>>>
> > >>>>> LogicalDelta
> > >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
> > >>>>>    LogicalFilter(condition=[>($2, 5)])
> > >>>>>
> > >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> > >>>>>
> > >>>>> If we consider LogicalFilter as a relation operator, we need
> > >> something
> > >>> to
> > >>>>> convert input stream to a relation before sending the tuples
> > >>> downstream.
> > >>>>> In addition to this, there is a optimization where we consider
> filter
> > >>>>> operator as a tuple operator and have it between StreamScan and
> > >>>>> stream-to-relation operator as a way of reducing the amount of
> > >> messages
> > >>>>> going downstream.
> > >>>>>
> > >>>>> Other scenario is windowed aggregates. Currently window spec is
> > >>> attached
> > >>>> to
> > >>>>> the LogicalProject in query plan like following:
> > >>>>>
> > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
> > >> PRECEDING
> > >>>> AND
> > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING
> AND 2
> > >>>>> FOLLOWING)):INTEGER, null)])
> > >>>>>
> > >>>>> I wanted to know from them whether it is possible to move window
> > >>>> operation
> > >>>>> just after the stream scan, so that it is compatible with our
> > >> operator
> > >>>>> layer.
> > >>>>> May be there are better or easier ways to do this. So your comments
> > >> are
> > >>>>> always welcome.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Milinda
> > >>>>>
> > >>>>>
> > >>>>> [1]
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> > >>>>>
> > >>>>> --
> > >>>>> Milinda Pathirage
> > >>>>>
> > >>>>> PhD Student | Research Assistant
> > >>>>> School of Informatics and Computing | Data to Insight Center
> > >>>>> Indiana University
> > >>>>>
> > >>>>> twitter: milindalakmal
> > >>>>> skype: milinda.pathirage
> > >>>>> blog: http://milinda.pathirage.org
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> Milinda Pathirage
> > >>>
> > >>> PhD Student | Research Assistant
> > >>> School of Informatics and Computing | Data to Insight Center
> > >>> Indiana University
> > >>>
> > >>> twitter: milindalakmal
> > >>> skype: milinda.pathirage
> > >>> blog: http://milinda.pathirage.org
> > >>>
> > >>
> > >
> > >
> > >
> > > --
> > > Milinda Pathirage
> > >
> > > PhD Student | Research Assistant
> > > School of Informatics and Computing | Data to Insight Center
> > > Indiana University
> > >
> > > twitter: milindalakmal
> > > skype: milinda.pathirage
> > > blog: http://milinda.pathirage.org
> >
> >
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Handling defaults and windowed aggregates in stream queries

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Julian,

I went through the draft and it covers most of our requirements. But
aggregation over a window will not be as simple as mentioned in the draft.

In the stream extension draft we have following:

'How did Calcite know that the 10:00:00 sub-totals were complete at
> 11:00:00, so that it could emit them? It knows that rowtime is increasing,
> and it knows that FLOOR(rowtime TO HOUR) is also increasing. So, once it
> has seen a row at or after 11:00:00, it will never see a row that will
> contribute to a 10:00:00 total.'


When there are delays, we can't do above. Because observing a row with
rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to 10:00:59
time window will not arrive after this observation. We have discussed this
in https://issues.apache.org/jira/browse/SAMZA-552. Even if we consider the
'system time/stream time' as mentioned in SAMZA-552, it doesn't guarantee
the absence of delays in a distributed setting. So we may need to
additional hints/extensions to specify extra information required to handle
complexities in window calculations.

May be there are ways to handle this at Samza level, not in the query
language.

@Chirs, @Yi
I got the query planner working with some dummy operators and re-writing
the query to add default window operators. But Julian's comments about
handling defaults and optimizing the query plan (moving the Delta down and
removing both Delta and Chi) got me into thinking whether enforcing CQL
semantics as we have in our current operator layer limits the flexibility
and increase the complexity of query plan to operator router generation.
Anyway, I am going to take a step back and think more about Julian's
comments. I'll put my thoughts into a design document for query planner.

Thanks
Milinda


On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <ju...@hydromatic.net> wrote:

> Sorry to show up late to this party. I've had my head down writing a
> description of streaming SQL which I hoped would answer questions like
> this. Here is the latest draft:
> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
>
> I've been avoiding windows for now. They are not needed for simple queries
> (project, filter, windowed aggregate) and I wanted to write the
> specification of more complex queries before I introduce them.
>
> Let's look at a simple query, filter. According to CQL, to evaluate
>
>   select stream *
>   from orders
>   where productId = 10    (query 1)
>
> you need to convert orders to a relation over a particular window, apply
> the filter, then convert back to a stream. We could write
>
>   select stream *
>   from orders over (order by rowtime range between unbounded preceding and
> current row)
>   where productId = 10    (query 2)
>
> or we could write
>
>   select stream *
>   from orders over (order by rowtime range between current row and current
> row)
>   where productId = 10      (query 3)
>
> Very different windows, but they produce the same result, because of the
> stateless nature of Filter. So, let's suppose that the default window is
> the one I gave first, "(order by rowtime range between unbounded preceding
> and current row)", and so query 1 is just short-hand for query 2.
>
> I currently translate query 1 to
>
> Delta
>   Filter($1 = 10)
>     Scan(orders)
>
> but I should really be translating to
>
> Delta
>   Filter($1 = 10)
>     Chi(order by $0 range between unbounded preceding and current row)
>       Scan(orders)
>
> Delta is the "differentiation" operator and Chi is the "integration"
> operator. After we apply rules to push the Delta through the Filter, the
> Delta and Chi will collide and cancel each other out.
>
> Why have I not yet introduced the Chi operator? Because I have not yet
> dealt with a query where it makes any difference.
>
> Where it will make a difference is joins. But even for joins, I hold out
> hope that we can avoid explicit windows, most of the time. One could write
>
>   select stream *
>   from orders over (order by rowtime range between current row and
> interval '1' hour following)
>   join shipments
>   on orders.orderId = shipments.orderId    (query 4)
>
> but I think most people would find the following clearer:
>
>   select stream *
>   from orders
>   join shipments
>   on orders.orderId = shipments.orderId          (query 5)
>   and shipments.rowtime between orders.rowtime and orders.rowtime +
> interval '1' hour
>
> Under the covers there are still the implicit windows:
>
>   select stream *
>   from orders over (order by rowtime range between unbounded preceding and
> current row)
>   join shipments over (order by rowtime range between unbounded preceding
> and current row)
>   on orders.orderId = shipments.orderId          (query 6)
>   and shipments.rowtime between orders.rowtime and orders.rowtime +
> interval '1' hour
>
> Query 6 is equivalent to query 5. But the system can notice the join
> condition involving the two streams' rowtimes and trim down the windows
> (one window to an hour, another window to just the current row) without
> changing semantics:
>
>   select stream *
>   from orders over (order by rowtime range between interval '1' hour
> preceding and current row)
>   join shipments over (order by rowtime range between current row and
> current row)
>   on orders.orderId = shipments.orderId          (query 7)
>   and shipments.rowtime between orders.rowtime and orders.rowtime +
> interval '1' hour
>
> So, my hope is that end-users will rarely need to use an explicit window.
>
> In the algebra, we will start introducing Chi. It will evaporate for
> simple queries such as Filter. It will remain for more complex queries such
> as stream-to-stream join, because you are joining the current row of one
> stream to a time-varying relation based on the other, and Chi represents
> that "recent history of a stream" relation.
>
> Julian
>
>
> > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <mp...@umail.iu.edu>
> wrote:
> >
> > Hi Yi,
> >
> > As I understand rules and re-writes basically do the same thing
> > (changing/re-writing the operator tree). But in case of rules this
> happens
> > during planning based on the query planner configuration. And re-writing
> is
> > done on the planner output, after the query goes through the planner. In
> > Calcite re-write is happening inside the interpreter and in our case it
> > will be inside the query plan to operator router conversion phase.
> >
> > Thanks
> > Milinda
> >
> > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> >> Hi, Milinda,
> >>
> >> +1 on your default window idea. One question: what's the difference
> between
> >> a rule and a re-write?
> >>
> >> Thanks!
> >>
> >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
> mpathira@umail.iu.edu>
> >> wrote:
> >>
> >>> @Chris
> >>> Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
> >>> window, it should be a ‘Unbounded’ window for most the default
> scenarios
> >>> (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf).
> >>> Because
> >>> applying a ‘Now’ window with size of 1 will double the number of events
> >>> generated if we consider insert/delete streams. But ‘Unbounded’ will
> only
> >>> generate insert events.
> >>>
> >>> @Yi
> >>> 1. You are correct about Calcite.There is no stream-to-relation
> >> conversion
> >>> happening. But as I understand we don’t need Calcite to support this.
> We
> >>> can add it to our query planner as a rule or re-write. What I am not
> sure
> >>> is whether to use a rule or a re-write.
> >>> 2. There is a rule in Calcite which extract the Window out from the
> >>> Project. But I am not sure why that didn’t happen in my test. This rule
> >> is
> >>> added to the planner by default. I’ll ask about this in Calcite mailing
> >>> list.
> >>>
> >>> I think we can figure out a way to move the window to the input stream
> if
> >>> Calcite can move the window out from Project. I’ll see how we can do
> >> this.
> >>>
> >>> Also I’ll go ahead and implement default windows. We can change it
> later
> >> if
> >>> Julian or someone from Calcite comes up with a better suggestion.
> >>>
> >>> Thanks
> >>> Milinda
> >>>
> >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:
> >>>
> >>>> Hi, Milinda,
> >>>>
> >>>> Sorry to reply late on this. Here are some of my comments:
> >>>> 1) In Calcite's model, it seems that there is no stream-to-relation
> >>>> conversion step. In the first example where the window specification
> is
> >>>> missing, I like your solution to add the default LogicalNowWindow
> >>> operator
> >>>> s.t. it makes the physical operator matches the query plan. However,
> if
> >>>> Calcite community does not agree to add the default LogicalNowWindow,
> >> it
> >>>> would be fine for us if we always insert a default "now" window on a
> >>> stream
> >>>> when we generate the Samza configuration.
> >>>> 2) I am more concerned on the other cases, where window operator is
> >> used
> >>> in
> >>>> aggregation and join. In your example of windowed aggregation in
> >> Calcite,
> >>>> window spec seems to be a decoration to the LogicalProject operator,
> >>>> instead of defining a data source to the LogicalProject operator. In
> >> the
> >>>> CQL model we followed, the window operator is considered as a query
> >>>> primitive that generate a data source for other relation operators to
> >>>> consume. How exactly is window operator used in Calcite planner? Isn't
> >> it
> >>>> much clear if the following is used?
> >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
> >>>> null)])
> >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> >>>>
> >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> >>> mpathira@umail.iu.edu
> >>>>>
> >>>> wrote:
> >>>>
> >>>>> Hi devs,
> >>>>>
> >>>>> I ask about $subject in calcite-dev. You can find the archived
> >>> discussion
> >>>>> at [1]. I think your thoughts are also valuable in this discussion in
> >>>>> calcite list.
> >>>>>
> >>>>> I discovered the requirement for a default window operator when I
> >> tried
> >>>> to
> >>>>> integrate streamscan (I was using tablescan prevously) into the
> >>> physical
> >>>>> plan generation logic. Because of the way we have written the
> >>>>> OperatorRouter API, we always need a stream-to-relation operator at
> >> the
> >>>>> input. But Calcite generates a query plan like following:
> >>>>>
> >>>>> LogicalDelta
> >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
> >>>>>    LogicalFilter(condition=[>($2, 5)])
> >>>>>
> >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> >>>>>
> >>>>> If we consider LogicalFilter as a relation operator, we need
> >> something
> >>> to
> >>>>> convert input stream to a relation before sending the tuples
> >>> downstream.
> >>>>> In addition to this, there is a optimization where we consider filter
> >>>>> operator as a tuple operator and have it between StreamScan and
> >>>>> stream-to-relation operator as a way of reducing the amount of
> >> messages
> >>>>> going downstream.
> >>>>>
> >>>>> Other scenario is windowed aggregates. Currently window spec is
> >>> attached
> >>>> to
> >>>>> the LogicalProject in query plan like following:
> >>>>>
> >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
> >> PRECEDING
> >>>> AND
> >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> >>>>> FOLLOWING)):INTEGER, null)])
> >>>>>
> >>>>> I wanted to know from them whether it is possible to move window
> >>>> operation
> >>>>> just after the stream scan, so that it is compatible with our
> >> operator
> >>>>> layer.
> >>>>> May be there are better or easier ways to do this. So your comments
> >> are
> >>>>> always welcome.
> >>>>>
> >>>>> Thanks
> >>>>> Milinda
> >>>>>
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> >>>>>
> >>>>> --
> >>>>> Milinda Pathirage
> >>>>>
> >>>>> PhD Student | Research Assistant
> >>>>> School of Informatics and Computing | Data to Insight Center
> >>>>> Indiana University
> >>>>>
> >>>>> twitter: milindalakmal
> >>>>> skype: milinda.pathirage
> >>>>> blog: http://milinda.pathirage.org
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Milinda Pathirage
> >>>
> >>> PhD Student | Research Assistant
> >>> School of Informatics and Computing | Data to Insight Center
> >>> Indiana University
> >>>
> >>> twitter: milindalakmal
> >>> skype: milinda.pathirage
> >>> blog: http://milinda.pathirage.org
> >>>
> >>
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
>
>


-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Handling defaults and windowed aggregates in stream queries

Posted by Julian Hyde <ju...@hydromatic.net>.
Sorry to show up late to this party. I've had my head down writing a description of streaming SQL which I hoped would answer questions like this. Here is the latest draft: https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md

I've been avoiding windows for now. They are not needed for simple queries (project, filter, windowed aggregate) and I wanted to write the specification of more complex queries before I introduce them.

Let's look at a simple query, filter. According to CQL, to evaluate

  select stream *
  from orders
  where productId = 10    (query 1)

you need to convert orders to a relation over a particular window, apply the filter, then convert back to a stream. We could write

  select stream *
  from orders over (order by rowtime range between unbounded preceding and current row)
  where productId = 10    (query 2)

or we could write

  select stream *
  from orders over (order by rowtime range between current row and current row)
  where productId = 10      (query 3)

Very different windows, but they produce the same result, because of the stateless nature of Filter. So, let's suppose that the default window is the one I gave first, "(order by rowtime range between unbounded preceding and current row)", and so query 1 is just short-hand for query 2.

I currently translate query 1 to

Delta
  Filter($1 = 10)
    Scan(orders)

but I should really be translating to

Delta
  Filter($1 = 10)
    Chi(order by $0 range between unbounded preceding and current row)
      Scan(orders)

Delta is the "differentiation" operator and Chi is the "integration" operator. After we apply rules to push the Delta through the Filter, the Delta and Chi will collide and cancel each other out.

Why have I not yet introduced the Chi operator? Because I have not yet dealt with a query where it makes any difference.

Where it will make a difference is joins. But even for joins, I hold out hope that we can avoid explicit windows, most of the time. One could write

  select stream *
  from orders over (order by rowtime range between current row and interval '1' hour following)
  join shipments
  on orders.orderId = shipments.orderId    (query 4)

but I think most people would find the following clearer:

  select stream *
  from orders
  join shipments
  on orders.orderId = shipments.orderId          (query 5)
  and shipments.rowtime between orders.rowtime and orders.rowtime + interval '1' hour

Under the covers there are still the implicit windows:

  select stream *
  from orders over (order by rowtime range between unbounded preceding and current row)
  join shipments over (order by rowtime range between unbounded preceding and current row)
  on orders.orderId = shipments.orderId          (query 6)
  and shipments.rowtime between orders.rowtime and orders.rowtime + interval '1' hour

Query 6 is equivalent to query 5. But the system can notice the join condition involving the two streams' rowtimes and trim down the windows (one window to an hour, another window to just the current row) without changing semantics:

  select stream *
  from orders over (order by rowtime range between interval '1' hour preceding and current row)
  join shipments over (order by rowtime range between current row and current row)
  on orders.orderId = shipments.orderId          (query 7)
  and shipments.rowtime between orders.rowtime and orders.rowtime + interval '1' hour

So, my hope is that end-users will rarely need to use an explicit window.

In the algebra, we will start introducing Chi. It will evaporate for simple queries such as Filter. It will remain for more complex queries such as stream-to-stream join, because you are joining the current row of one stream to a time-varying relation based on the other, and Chi represents that "recent history of a stream" relation.

Julian


> On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <mp...@umail.iu.edu> wrote:
> 
> Hi Yi,
> 
> As I understand rules and re-writes basically do the same thing
> (changing/re-writing the operator tree). But in case of rules this happens
> during planning based on the query planner configuration. And re-writing is
> done on the planner output, after the query goes through the planner. In
> Calcite re-write is happening inside the interpreter and in our case it
> will be inside the query plan to operator router conversion phase.
> 
> Thanks
> Milinda
> 
> On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:
> 
>> Hi, Milinda,
>> 
>> +1 on your default window idea. One question: what's the difference between
>> a rule and a re-write?
>> 
>> Thanks!
>> 
>> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <mp...@umail.iu.edu>
>> wrote:
>> 
>>> @Chris
>>> Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
>>> window, it should be a ‘Unbounded’ window for most the default scenarios
>>> (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf).
>>> Because
>>> applying a ‘Now’ window with size of 1 will double the number of events
>>> generated if we consider insert/delete streams. But ‘Unbounded’ will only
>>> generate insert events.
>>> 
>>> @Yi
>>> 1. You are correct about Calcite.There is no stream-to-relation
>> conversion
>>> happening. But as I understand we don’t need Calcite to support this. We
>>> can add it to our query planner as a rule or re-write. What I am not sure
>>> is whether to use a rule or a re-write.
>>> 2. There is a rule in Calcite which extract the Window out from the
>>> Project. But I am not sure why that didn’t happen in my test. This rule
>> is
>>> added to the planner by default. I’ll ask about this in Calcite mailing
>>> list.
>>> 
>>> I think we can figure out a way to move the window to the input stream if
>>> Calcite can move the window out from Project. I’ll see how we can do
>> this.
>>> 
>>> Also I’ll go ahead and implement default windows. We can change it later
>> if
>>> Julian or someone from Calcite comes up with a better suggestion.
>>> 
>>> Thanks
>>> Milinda
>>> 
>>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:
>>> 
>>>> Hi, Milinda,
>>>> 
>>>> Sorry to reply late on this. Here are some of my comments:
>>>> 1) In Calcite's model, it seems that there is no stream-to-relation
>>>> conversion step. In the first example where the window specification is
>>>> missing, I like your solution to add the default LogicalNowWindow
>>> operator
>>>> s.t. it makes the physical operator matches the query plan. However, if
>>>> Calcite community does not agree to add the default LogicalNowWindow,
>> it
>>>> would be fine for us if we always insert a default "now" window on a
>>> stream
>>>> when we generate the Samza configuration.
>>>> 2) I am more concerned on the other cases, where window operator is
>> used
>>> in
>>>> aggregation and join. In your example of windowed aggregation in
>> Calcite,
>>>> window spec seems to be a decoration to the LogicalProject operator,
>>>> instead of defining a data source to the LogicalProject operator. In
>> the
>>>> CQL model we followed, the window operator is considered as a query
>>>> primitive that generate a data source for other relation operators to
>>>> consume. How exactly is window operator used in Calcite planner? Isn't
>> it
>>>> much clear if the following is used?
>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
>>>> null)])
>>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
>>>> 
>>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
>>> mpathira@umail.iu.edu
>>>>> 
>>>> wrote:
>>>> 
>>>>> Hi devs,
>>>>> 
>>>>> I ask about $subject in calcite-dev. You can find the archived
>>> discussion
>>>>> at [1]. I think your thoughts are also valuable in this discussion in
>>>>> calcite list.
>>>>> 
>>>>> I discovered the requirement for a default window operator when I
>> tried
>>>> to
>>>>> integrate streamscan (I was using tablescan prevously) into the
>>> physical
>>>>> plan generation logic. Because of the way we have written the
>>>>> OperatorRouter API, we always need a stream-to-relation operator at
>> the
>>>>> input. But Calcite generates a query plan like following:
>>>>> 
>>>>> LogicalDelta
>>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
>>>>>    LogicalFilter(condition=[>($2, 5)])
>>>>> 
>>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>>>>> 
>>>>> If we consider LogicalFilter as a relation operator, we need
>> something
>>> to
>>>>> convert input stream to a relation before sending the tuples
>>> downstream.
>>>>> In addition to this, there is a optimization where we consider filter
>>>>> operator as a tuple operator and have it between StreamScan and
>>>>> stream-to-relation operator as a way of reducing the amount of
>> messages
>>>>> going downstream.
>>>>> 
>>>>> Other scenario is windowed aggregates. Currently window spec is
>>> attached
>>>> to
>>>>> the LogicalProject in query plan like following:
>>>>> 
>>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
>> PRECEDING
>>>> AND
>>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
>>>>> FOLLOWING)):INTEGER, null)])
>>>>> 
>>>>> I wanted to know from them whether it is possible to move window
>>>> operation
>>>>> just after the stream scan, so that it is compatible with our
>> operator
>>>>> layer.
>>>>> May be there are better or easier ways to do this. So your comments
>> are
>>>>> always welcome.
>>>>> 
>>>>> Thanks
>>>>> Milinda
>>>>> 
>>>>> 
>>>>> [1]
>>>>> 
>>>>> 
>>>> 
>>> 
>> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>>>>> 
>>>>> --
>>>>> Milinda Pathirage
>>>>> 
>>>>> PhD Student | Research Assistant
>>>>> School of Informatics and Computing | Data to Insight Center
>>>>> Indiana University
>>>>> 
>>>>> twitter: milindalakmal
>>>>> skype: milinda.pathirage
>>>>> blog: http://milinda.pathirage.org
>>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> Milinda Pathirage
>>> 
>>> PhD Student | Research Assistant
>>> School of Informatics and Computing | Data to Insight Center
>>> Indiana University
>>> 
>>> twitter: milindalakmal
>>> skype: milinda.pathirage
>>> blog: http://milinda.pathirage.org
>>> 
>> 
> 
> 
> 
> -- 
> Milinda Pathirage
> 
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
> 
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org


Re: Handling defaults and windowed aggregates in stream queries

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Yi,

As I understand rules and re-writes basically do the same thing
(changing/re-writing the operator tree). But in case of rules this happens
during planning based on the query planner configuration. And re-writing is
done on the planner output, after the query goes through the planner. In
Calcite re-write is happening inside the interpreter and in our case it
will be inside the query plan to operator router conversion phase.

Thanks
Milinda

On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Milinda,
>
> +1 on your default window idea. One question: what's the difference between
> a rule and a re-write?
>
> Thanks!
>
> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <mp...@umail.iu.edu>
> wrote:
>
> > @Chris
> > Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
> > window, it should be a ‘Unbounded’ window for most the default scenarios
> > (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf).
> > Because
> > applying a ‘Now’ window with size of 1 will double the number of events
> > generated if we consider insert/delete streams. But ‘Unbounded’ will only
> > generate insert events.
> >
> > @Yi
> > 1. You are correct about Calcite.There is no stream-to-relation
> conversion
> > happening. But as I understand we don’t need Calcite to support this. We
> > can add it to our query planner as a rule or re-write. What I am not sure
> > is whether to use a rule or a re-write.
> > 2. There is a rule in Calcite which extract the Window out from the
> > Project. But I am not sure why that didn’t happen in my test. This rule
> is
> > added to the planner by default. I’ll ask about this in Calcite mailing
> > list.
> >
> > I think we can figure out a way to move the window to the input stream if
> > Calcite can move the window out from Project. I’ll see how we can do
> this.
> >
> > Also I’ll go ahead and implement default windows. We can change it later
> if
> > Julian or someone from Calcite comes up with a better suggestion.
> >
> > Thanks
> > Milinda
> >
> > On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> > > Hi, Milinda,
> > >
> > > Sorry to reply late on this. Here are some of my comments:
> > > 1) In Calcite's model, it seems that there is no stream-to-relation
> > > conversion step. In the first example where the window specification is
> > > missing, I like your solution to add the default LogicalNowWindow
> > operator
> > > s.t. it makes the physical operator matches the query plan. However, if
> > > Calcite community does not agree to add the default LogicalNowWindow,
> it
> > > would be fine for us if we always insert a default "now" window on a
> > stream
> > > when we generate the Samza configuration.
> > > 2) I am more concerned on the other cases, where window operator is
> used
> > in
> > > aggregation and join. In your example of windowed aggregation in
> Calcite,
> > > window spec seems to be a decoration to the LogicalProject operator,
> > > instead of defining a data source to the LogicalProject operator. In
> the
> > > CQL model we followed, the window operator is considered as a query
> > > primitive that generate a data source for other relation operators to
> > > consume. How exactly is window operator used in Calcite planner? Isn't
> it
> > > much clear if the following is used?
> > > LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
> > > null)])
> > >    LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> > >
> > > On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> > mpathira@umail.iu.edu
> > > >
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > I ask about $subject in calcite-dev. You can find the archived
> > discussion
> > > > at [1]. I think your thoughts are also valuable in this discussion in
> > > > calcite list.
> > > >
> > > > I discovered the requirement for a default window operator when I
> tried
> > > to
> > > > integrate streamscan (I was using tablescan prevously) into the
> > physical
> > > > plan generation logic. Because of the way we have written the
> > > > OperatorRouter API, we always need a stream-to-relation operator at
> the
> > > > input. But Calcite generates a query plan like following:
> > > >
> > > > LogicalDelta
> > > >   LogicalProject(id=[$0], product=[$1], quantity=[$2])
> > > >     LogicalFilter(condition=[>($2, 5)])
> > > >
> > > >       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> > > >
> > > > If we consider LogicalFilter as a relation operator, we need
> something
> > to
> > > > convert input stream to a relation before sending the tuples
> > downstream.
> > > > In addition to this, there is a optimization where we consider filter
> > > > operator as a tuple operator and have it between StreamScan and
> > > > stream-to-relation operator as a way of reducing the amount of
> messages
> > > > going downstream.
> > > >
> > > > Other scenario is windowed aggregates. Currently window spec is
> > attached
> > > to
> > > > the LogicalProject in query plan like following:
> > > >
> > > > LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
> PRECEDING
> > > AND
> > > > 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> > > > FOLLOWING)):INTEGER, null)])
> > > >
> > > > I wanted to know from them whether it is possible to move window
> > > operation
> > > > just after the stream scan, so that it is compatible with our
> operator
> > > > layer.
> > > > May be there are better or easier ways to do this. So your comments
> are
> > > > always welcome.
> > > >
> > > > Thanks
> > > > Milinda
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> > > >
> > > > --
> > > > Milinda Pathirage
> > > >
> > > > PhD Student | Research Assistant
> > > > School of Informatics and Computing | Data to Insight Center
> > > > Indiana University
> > > >
> > > > twitter: milindalakmal
> > > > skype: milinda.pathirage
> > > > blog: http://milinda.pathirage.org
> > > >
> > >
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Handling defaults and windowed aggregates in stream queries

Posted by Yi Pan <ni...@gmail.com>.
Hi, Milinda,

+1 on your default window idea. One question: what's the difference between
a rule and a re-write?

Thanks!

On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> @Chris
> Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
> window, it should be a ‘Unbounded’ window for most the default scenarios
> (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf).
> Because
> applying a ‘Now’ window with size of 1 will double the number of events
> generated if we consider insert/delete streams. But ‘Unbounded’ will only
> generate insert events.
>
> @Yi
> 1. You are correct about Calcite.There is no stream-to-relation conversion
> happening. But as I understand we don’t need Calcite to support this. We
> can add it to our query planner as a rule or re-write. What I am not sure
> is whether to use a rule or a re-write.
> 2. There is a rule in Calcite which extract the Window out from the
> Project. But I am not sure why that didn’t happen in my test. This rule is
> added to the planner by default. I’ll ask about this in Calcite mailing
> list.
>
> I think we can figure out a way to move the window to the input stream if
> Calcite can move the window out from Project. I’ll see how we can do this.
>
> Also I’ll go ahead and implement default windows. We can change it later if
> Julian or someone from Calcite comes up with a better suggestion.
>
> Thanks
> Milinda
>
> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, Milinda,
> >
> > Sorry to reply late on this. Here are some of my comments:
> > 1) In Calcite's model, it seems that there is no stream-to-relation
> > conversion step. In the first example where the window specification is
> > missing, I like your solution to add the default LogicalNowWindow
> operator
> > s.t. it makes the physical operator matches the query plan. However, if
> > Calcite community does not agree to add the default LogicalNowWindow, it
> > would be fine for us if we always insert a default "now" window on a
> stream
> > when we generate the Samza configuration.
> > 2) I am more concerned on the other cases, where window operator is used
> in
> > aggregation and join. In your example of windowed aggregation in Calcite,
> > window spec seems to be a decoration to the LogicalProject operator,
> > instead of defining a data source to the LogicalProject operator. In the
> > CQL model we followed, the window operator is considered as a query
> > primitive that generate a data source for other relation operators to
> > consume. How exactly is window operator used in Calcite planner? Isn't it
> > much clear if the following is used?
> > LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
> > null)])
> >    LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> >
> > On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
> mpathira@umail.iu.edu
> > >
> > wrote:
> >
> > > Hi devs,
> > >
> > > I ask about $subject in calcite-dev. You can find the archived
> discussion
> > > at [1]. I think your thoughts are also valuable in this discussion in
> > > calcite list.
> > >
> > > I discovered the requirement for a default window operator when I tried
> > to
> > > integrate streamscan (I was using tablescan prevously) into the
> physical
> > > plan generation logic. Because of the way we have written the
> > > OperatorRouter API, we always need a stream-to-relation operator at the
> > > input. But Calcite generates a query plan like following:
> > >
> > > LogicalDelta
> > >   LogicalProject(id=[$0], product=[$1], quantity=[$2])
> > >     LogicalFilter(condition=[>($2, 5)])
> > >
> > >       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> > >
> > > If we consider LogicalFilter as a relation operator, we need something
> to
> > > convert input stream to a relation before sending the tuples
> downstream.
> > > In addition to this, there is a optimization where we consider filter
> > > operator as a tuple operator and have it between StreamScan and
> > > stream-to-relation operator as a way of reducing the amount of messages
> > > going downstream.
> > >
> > > Other scenario is windowed aggregates. Currently window spec is
> attached
> > to
> > > the LogicalProject in query plan like following:
> > >
> > > LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING
> > AND
> > > 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> > > FOLLOWING)):INTEGER, null)])
> > >
> > > I wanted to know from them whether it is possible to move window
> > operation
> > > just after the stream scan, so that it is compatible with our operator
> > > layer.
> > > May be there are better or easier ways to do this. So your comments are
> > > always welcome.
> > >
> > > Thanks
> > > Milinda
> > >
> > >
> > > [1]
> > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> > >
> > > --
> > > Milinda Pathirage
> > >
> > > PhD Student | Research Assistant
> > > School of Informatics and Computing | Data to Insight Center
> > > Indiana University
> > >
> > > twitter: milindalakmal
> > > skype: milinda.pathirage
> > > blog: http://milinda.pathirage.org
> > >
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Handling defaults and windowed aggregates in stream queries

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
@Chris
Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
window, it should be a ‘Unbounded’ window for most the default scenarios
(Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf). Because
applying a ‘Now’ window with size of 1 will double the number of events
generated if we consider insert/delete streams. But ‘Unbounded’ will only
generate insert events.

@Yi
1. You are correct about Calcite.There is no stream-to-relation conversion
happening. But as I understand we don’t need Calcite to support this. We
can add it to our query planner as a rule or re-write. What I am not sure
is whether to use a rule or a re-write.
2. There is a rule in Calcite which extract the Window out from the
Project. But I am not sure why that didn’t happen in my test. This rule is
added to the planner by default. I’ll ask about this in Calcite mailing
list.

I think we can figure out a way to move the window to the input stream if
Calcite can move the window out from Project. I’ll see how we can do this.

Also I’ll go ahead and implement default windows. We can change it later if
Julian or someone from Calcite comes up with a better suggestion.

Thanks
Milinda

On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Milinda,
>
> Sorry to reply late on this. Here are some of my comments:
> 1) In Calcite's model, it seems that there is no stream-to-relation
> conversion step. In the first example where the window specification is
> missing, I like your solution to add the default LogicalNowWindow operator
> s.t. it makes the physical operator matches the query plan. However, if
> Calcite community does not agree to add the default LogicalNowWindow, it
> would be fine for us if we always insert a default "now" window on a stream
> when we generate the Samza configuration.
> 2) I am more concerned on the other cases, where window operator is used in
> aggregation and join. In your example of windowed aggregation in Calcite,
> window spec seems to be a decoration to the LogicalProject operator,
> instead of defining a data source to the LogicalProject operator. In the
> CQL model we followed, the window operator is considered as a query
> primitive that generate a data source for other relation operators to
> consume. How exactly is window operator used in Calcite planner? Isn't it
> much clear if the following is used?
> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
> null)])
>    LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
>
> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <mpathira@umail.iu.edu
> >
> wrote:
>
> > Hi devs,
> >
> > I ask about $subject in calcite-dev. You can find the archived discussion
> > at [1]. I think your thoughts are also valuable in this discussion in
> > calcite list.
> >
> > I discovered the requirement for a default window operator when I tried
> to
> > integrate streamscan (I was using tablescan prevously) into the physical
> > plan generation logic. Because of the way we have written the
> > OperatorRouter API, we always need a stream-to-relation operator at the
> > input. But Calcite generates a query plan like following:
> >
> > LogicalDelta
> >   LogicalProject(id=[$0], product=[$1], quantity=[$2])
> >     LogicalFilter(condition=[>($2, 5)])
> >
> >       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> >
> > If we consider LogicalFilter as a relation operator, we need something to
> > convert input stream to a relation before sending the tuples downstream.
> > In addition to this, there is a optimization where we consider filter
> > operator as a tuple operator and have it between StreamScan and
> > stream-to-relation operator as a way of reducing the amount of messages
> > going downstream.
> >
> > Other scenario is windowed aggregates. Currently window spec is attached
> to
> > the LogicalProject in query plan like following:
> >
> > LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING
> AND
> > 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> > FOLLOWING)):INTEGER, null)])
> >
> > I wanted to know from them whether it is possible to move window
> operation
> > just after the stream scan, so that it is compatible with our operator
> > layer.
> > May be there are better or easier ways to do this. So your comments are
> > always welcome.
> >
> > Thanks
> > Milinda
> >
> >
> > [1]
> >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Handling defaults and windowed aggregates in stream queries

Posted by Yi Pan <ni...@gmail.com>.
Hi, Milinda,

Sorry to reply late on this. Here are some of my comments:
1) In Calcite's model, it seems that there is no stream-to-relation
conversion step. In the first example where the window specification is
missing, I like your solution to add the default LogicalNowWindow operator
s.t. it makes the physical operator matches the query plan. However, if
Calcite community does not agree to add the default LogicalNowWindow, it
would be fine for us if we always insert a default "now" window on a stream
when we generate the Samza configuration.
2) I am more concerned on the other cases, where window operator is used in
aggregation and join. In your example of windowed aggregation in Calcite,
window spec seems to be a decoration to the LogicalProject operator,
instead of defining a data source to the LogicalProject operator. In the
CQL model we followed, the window operator is considered as a query
primitive that generate a data source for other relation operators to
consume. How exactly is window operator used in Calcite planner? Isn't it
much clear if the following is used?
LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
null)])
   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)

On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi devs,
>
> I ask about $subject in calcite-dev. You can find the archived discussion
> at [1]. I think your thoughts are also valuable in this discussion in
> calcite list.
>
> I discovered the requirement for a default window operator when I tried to
> integrate streamscan (I was using tablescan prevously) into the physical
> plan generation logic. Because of the way we have written the
> OperatorRouter API, we always need a stream-to-relation operator at the
> input. But Calcite generates a query plan like following:
>
> LogicalDelta
>   LogicalProject(id=[$0], product=[$1], quantity=[$2])
>     LogicalFilter(condition=[>($2, 5)])
>
>       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>
> If we consider LogicalFilter as a relation operator, we need something to
> convert input stream to a relation before sending the tuples downstream.
> In addition to this, there is a optimization where we consider filter
> operator as a tuple operator and have it between StreamScan and
> stream-to-relation operator as a way of reducing the amount of messages
> going downstream.
>
> Other scenario is windowed aggregates. Currently window spec is attached to
> the LogicalProject in query plan like following:
>
> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING AND
> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> FOLLOWING)):INTEGER, null)])
>
> I wanted to know from them whether it is possible to move window operation
> just after the stream scan, so that it is compatible with our operator
> layer.
> May be there are better or easier ways to do this. So your comments are
> always welcome.
>
> Thanks
> Milinda
>
>
> [1]
>
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>