You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Milinda Pathirage <mp...@umail.iu.edu> on 2015/06/26 19:49:50 UTC

Triggering emits for streaming window aggregates

Hi Julian,

Even though this is a general question across all the streaming aggregates
which utilize GROUP BY clause and a monotonic timestamp field for
specifying the window, but I am going to stick to most basic example (which
is from Calcite Streaming document).

SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
  productId,
  COUNT(*) AS c,
  SUM(units) AS units
FROM Orders
GROUP BY FLOOR(rowtime TO HOUR), productId;

I was trying to implement an aggregate operator which handles tumbling
windows via the monotonic field in GROUP By clause in addition to the
general aggregations. I went in this path because I thought integrating
windowing aspects (at least for tumbling and hopping) into aggregate
operator will be easier than trying to extract the window spec from the
query plan for a query like above. But I hit a wall when trying to figure
out trigger condition for emitting aggregate results. I was initially
planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
aggregate results for previous groups (I was thinking to keep old groups
around until we clean them up after a timeout). But when trying to
implement this I figured out that I don’t know how to check which GROUP BY
field is monotonic so that I only detect new values for the monotonic
field/fields, not for the all the other fields. I think this is not a
problem for tables because we have the whole input before computation and
we wait till we are done with the input before emitting the results.

With regards to above can you please clarify following things:

- Is the method I described above for handling streaming aggregates make
sense at all?
- Is there a way that I can figure out which fields/expressions in
LogicalAggregate are monotonic?
- Or can we write a rule to annotate or add extra metadata to
LogicalAggregate so that we can get monotonic fields in the GROUP By clause

Thanks in advance
Milinda


-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Julian,

After debugging I found two possible causes for not having collation trait
in LogicalAggregate.

- RelMdCollation#project only handles projects of the type *RexInputRef*
and doesn't handle *RexCall*. Because of this we loose ordering information
related to function expressions
- When creating LogicalAggregate (line 93 to 95), we don't take traits of
the input into account.

I created two JIRA tickets to track the progress.

- https://issues.apache.org/jira/browse/CALCITE-784
- https://issues.apache.org/jira/browse/CALCITE-783

Thanks
Milinda

On Mon, Jun 29, 2015 at 4:37 PM, Ted Dunning <te...@gmail.com> wrote:

> Another complicating issue is the practical requirement that often comes up
> that aggregates for late arriving data be kept separate for the same window
> that arrived on time.  This allows late arriving aggregates to be reported
> separately.  This is a fundamental change in the meaning of windowed
> aggregates, of course, but it is also a common requirement.
>
>
>
> On Mon, Jun 29, 2015 at 7:25 AM, Milinda Pathirage <mp...@umail.iu.edu>
> wrote:
>
> > Hi Ted,
> >
> > We have discussed most of the complexities related to window handling in
> a
> > different thread [1]. My bad that I didn't provide those additional
> details
> > when I started this thread. We have a window store (implemented on top of
> > Samza's local storage) to keep track of old windows to trigger new
> results
> > for late arrivals. Document [2]  discusses most of the things related to
> > window store's design.
> >
> > Thanks
> > Milinda
> >
> > [1] https://issues.apache.org/jira/browse/SAMZA-552
> > [2]
> >
> >
> https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf
> >
> > On Sun, Jun 28, 2015 at 2:21 AM, Ted Dunning <te...@gmail.com>
> > wrote:
> >
> > > Here is the biggest recent thread on this.  You might also ask directly
> > > what they think about the algebraic issue as you see it.
> > >
> > >
> > >
> > >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3CCANMXwW3bOgaJhG_syH2%3D0x5BcdukyTOF0dU3dM4_3yQK2UHoyw%40mail.gmail.com%3E
> > >
> > > Here are some thoughts that mostly deal with implementation, but also
> > > discuss a few theoretical aspects.  These then link into concepts such
> as
> > > data types (Flink recognized sortedness in type information, for
> > instance),
> > > the snaphost algorithms (because window triggers are very similar to
> the
> > > Lamport/Chandry algorithms used for snapshots and state handling), the
> > > optimizer (only a side comment in this regard) and other aspects.
> > >
> > >
> > >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit#heading=h.faju7vv5ilgm
> > >
> > > On Sun, Jun 28, 2015 at 12:48 AM, Julian Hyde <jh...@apache.org>
> wrote:
> > >
> > > > Ted,
> > > >
> > > > Do you have a link to a pertinent email thread from the Flink list?
> > > >
> > > > I can see how shifting from monotonic to k-sorted or punctuation
> could
> > > > make a big impact to the runtime of a streaming system like Flink.
> But
> > I
> > > > don’t think the impact on the algebra is as big, and that’s what
> we’re
> > > > concerned with in Calcite.
> > > >
> > > > Julian
> > > >
> > > >
> > > > > On Jun 26, 2015, at 11:18 PM, Ted Dunning <te...@gmail.com>
> > > wrote:
> > > > >
> > > > > On Sat, Jun 27, 2015 at 1:13 AM, Julian Hyde <jh...@apache.org>
> > wrote:
> > > > >
> > > > >> Algebraic reasoning based on monotonicity can be extended to the
> > other
> > > > >> models. If we start with the more complex models we'd soon we up
> to
> > > > >> our hubcaps in theoretical mud.
> > > > >>
> > > > >
> > > > > As you like.  Flink has just had to rip up and repair a bunch of
> > stuff
> > > > > precisely because they started with an assumption of monotonicity
> and
> > > had
> > > > > to move to a looser model.  The practical impact was pretty
> > substantial
> > > > and
> > > > > substantially larger than the comments here would imply.
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Ted Dunning <te...@gmail.com>.
Another complicating issue is the practical requirement that often comes up
that aggregates for late arriving data be kept separate for the same window
that arrived on time.  This allows late arriving aggregates to be reported
separately.  This is a fundamental change in the meaning of windowed
aggregates, of course, but it is also a common requirement.



On Mon, Jun 29, 2015 at 7:25 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi Ted,
>
> We have discussed most of the complexities related to window handling in a
> different thread [1]. My bad that I didn't provide those additional details
> when I started this thread. We have a window store (implemented on top of
> Samza's local storage) to keep track of old windows to trigger new results
> for late arrivals. Document [2]  discusses most of the things related to
> window store's design.
>
> Thanks
> Milinda
>
> [1] https://issues.apache.org/jira/browse/SAMZA-552
> [2]
>
> https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf
>
> On Sun, Jun 28, 2015 at 2:21 AM, Ted Dunning <te...@gmail.com>
> wrote:
>
> > Here is the biggest recent thread on this.  You might also ask directly
> > what they think about the algebraic issue as you see it.
> >
> >
> >
> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3CCANMXwW3bOgaJhG_syH2%3D0x5BcdukyTOF0dU3dM4_3yQK2UHoyw%40mail.gmail.com%3E
> >
> > Here are some thoughts that mostly deal with implementation, but also
> > discuss a few theoretical aspects.  These then link into concepts such as
> > data types (Flink recognized sortedness in type information, for
> instance),
> > the snaphost algorithms (because window triggers are very similar to the
> > Lamport/Chandry algorithms used for snapshots and state handling), the
> > optimizer (only a side comment in this regard) and other aspects.
> >
> >
> >
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit#heading=h.faju7vv5ilgm
> >
> > On Sun, Jun 28, 2015 at 12:48 AM, Julian Hyde <jh...@apache.org> wrote:
> >
> > > Ted,
> > >
> > > Do you have a link to a pertinent email thread from the Flink list?
> > >
> > > I can see how shifting from monotonic to k-sorted or punctuation could
> > > make a big impact to the runtime of a streaming system like Flink. But
> I
> > > don’t think the impact on the algebra is as big, and that’s what we’re
> > > concerned with in Calcite.
> > >
> > > Julian
> > >
> > >
> > > > On Jun 26, 2015, at 11:18 PM, Ted Dunning <te...@gmail.com>
> > wrote:
> > > >
> > > > On Sat, Jun 27, 2015 at 1:13 AM, Julian Hyde <jh...@apache.org>
> wrote:
> > > >
> > > >> Algebraic reasoning based on monotonicity can be extended to the
> other
> > > >> models. If we start with the more complex models we'd soon we up to
> > > >> our hubcaps in theoretical mud.
> > > >>
> > > >
> > > > As you like.  Flink has just had to rip up and repair a bunch of
> stuff
> > > > precisely because they started with an assumption of monotonicity and
> > had
> > > > to move to a looser model.  The practical impact was pretty
> substantial
> > > and
> > > > substantially larger than the comments here would imply.
> > >
> > >
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Triggering emits for streaming window aggregates

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Ted,

We have discussed most of the complexities related to window handling in a
different thread [1]. My bad that I didn't provide those additional details
when I started this thread. We have a window store (implemented on top of
Samza's local storage) to keep track of old windows to trigger new results
for late arrivals. Document [2]  discusses most of the things related to
window store's design.

Thanks
Milinda

[1] https://issues.apache.org/jira/browse/SAMZA-552
[2]
https://issues.apache.org/jira/secure/attachment/12708934/DESIGN-SAMZA-552-7.pdf

On Sun, Jun 28, 2015 at 2:21 AM, Ted Dunning <te...@gmail.com> wrote:

> Here is the biggest recent thread on this.  You might also ask directly
> what they think about the algebraic issue as you see it.
>
>
>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3CCANMXwW3bOgaJhG_syH2%3D0x5BcdukyTOF0dU3dM4_3yQK2UHoyw%40mail.gmail.com%3E
>
> Here are some thoughts that mostly deal with implementation, but also
> discuss a few theoretical aspects.  These then link into concepts such as
> data types (Flink recognized sortedness in type information, for instance),
> the snaphost algorithms (because window triggers are very similar to the
> Lamport/Chandry algorithms used for snapshots and state handling), the
> optimizer (only a side comment in this regard) and other aspects.
>
>
> https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit#heading=h.faju7vv5ilgm
>
> On Sun, Jun 28, 2015 at 12:48 AM, Julian Hyde <jh...@apache.org> wrote:
>
> > Ted,
> >
> > Do you have a link to a pertinent email thread from the Flink list?
> >
> > I can see how shifting from monotonic to k-sorted or punctuation could
> > make a big impact to the runtime of a streaming system like Flink. But I
> > don’t think the impact on the algebra is as big, and that’s what we’re
> > concerned with in Calcite.
> >
> > Julian
> >
> >
> > > On Jun 26, 2015, at 11:18 PM, Ted Dunning <te...@gmail.com>
> wrote:
> > >
> > > On Sat, Jun 27, 2015 at 1:13 AM, Julian Hyde <jh...@apache.org> wrote:
> > >
> > >> Algebraic reasoning based on monotonicity can be extended to the other
> > >> models. If we start with the more complex models we'd soon we up to
> > >> our hubcaps in theoretical mud.
> > >>
> > >
> > > As you like.  Flink has just had to rip up and repair a bunch of stuff
> > > precisely because they started with an assumption of monotonicity and
> had
> > > to move to a looser model.  The practical impact was pretty substantial
> > and
> > > substantially larger than the comments here would imply.
> >
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Ted Dunning <te...@gmail.com>.
Here is the biggest recent thread on this.  You might also ask directly
what they think about the algebraic issue as you see it.


https://mail-archives.apache.org/mod_mbox/flink-dev/201506.mbox/%3CCANMXwW3bOgaJhG_syH2%3D0x5BcdukyTOF0dU3dM4_3yQK2UHoyw%40mail.gmail.com%3E

Here are some thoughts that mostly deal with implementation, but also
discuss a few theoretical aspects.  These then link into concepts such as
data types (Flink recognized sortedness in type information, for instance),
the snaphost algorithms (because window triggers are very similar to the
Lamport/Chandry algorithms used for snapshots and state handling), the
optimizer (only a side comment in this regard) and other aspects.

https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfoPsW6_4/edit#heading=h.faju7vv5ilgm

On Sun, Jun 28, 2015 at 12:48 AM, Julian Hyde <jh...@apache.org> wrote:

> Ted,
>
> Do you have a link to a pertinent email thread from the Flink list?
>
> I can see how shifting from monotonic to k-sorted or punctuation could
> make a big impact to the runtime of a streaming system like Flink. But I
> don’t think the impact on the algebra is as big, and that’s what we’re
> concerned with in Calcite.
>
> Julian
>
>
> > On Jun 26, 2015, at 11:18 PM, Ted Dunning <te...@gmail.com> wrote:
> >
> > On Sat, Jun 27, 2015 at 1:13 AM, Julian Hyde <jh...@apache.org> wrote:
> >
> >> Algebraic reasoning based on monotonicity can be extended to the other
> >> models. If we start with the more complex models we'd soon we up to
> >> our hubcaps in theoretical mud.
> >>
> >
> > As you like.  Flink has just had to rip up and repair a bunch of stuff
> > precisely because they started with an assumption of monotonicity and had
> > to move to a looser model.  The practical impact was pretty substantial
> and
> > substantially larger than the comments here would imply.
>
>

Re: Triggering emits for streaming window aggregates

Posted by Julian Hyde <jh...@apache.org>.
Ted,

Do you have a link to a pertinent email thread from the Flink list?

I can see how shifting from monotonic to k-sorted or punctuation could make a big impact to the runtime of a streaming system like Flink. But I don’t think the impact on the algebra is as big, and that’s what we’re concerned with in Calcite.

Julian
 

> On Jun 26, 2015, at 11:18 PM, Ted Dunning <te...@gmail.com> wrote:
> 
> On Sat, Jun 27, 2015 at 1:13 AM, Julian Hyde <jh...@apache.org> wrote:
> 
>> Algebraic reasoning based on monotonicity can be extended to the other
>> models. If we start with the more complex models we'd soon we up to
>> our hubcaps in theoretical mud.
>> 
> 
> As you like.  Flink has just had to rip up and repair a bunch of stuff
> precisely because they started with an assumption of monotonicity and had
> to move to a looser model.  The practical impact was pretty substantial and
> substantially larger than the comments here would imply.


Re: Triggering emits for streaming window aggregates

Posted by Ted Dunning <te...@gmail.com>.
On Sat, Jun 27, 2015 at 1:13 AM, Julian Hyde <jh...@apache.org> wrote:

> Algebraic reasoning based on monotonicity can be extended to the other
> models. If we start with the more complex models we'd soon we up to
> our hubcaps in theoretical mud.
>

As you like.  Flink has just had to rip up and repair a bunch of stuff
precisely because they started with an assumption of monotonicity and had
to move to a looser model.  The practical impact was pretty substantial and
substantially larger than the comments here would imply.

Re: Triggering emits for streaming window aggregates

Posted by Julian Hyde <jh...@apache.org>.
Yes, but in my opinion, the monotonicity model gets us close enough,
or at least gets us headed in the right direction.

Why? Because although we can never be absolutely sure that we have
received all of the 09:00 - 10:00 records to emit an hourly total,
someone makes a business rule that says that any record arriving more
than 5 minutes late will be be ignored or will count towards the next
hour. Or that the server creating the sub-total should wait until it
has received a "punctuation" signal from all up-stream servers.

Those business rules exist, because businesses need to make progress.
(My company's finance department is already telling me that the
deadline for Q2 expenses is EOB on July 1st.)

The policy can be codified in terms of monotonicity, k-sortedness [1],
and punctuation [2] (also known as "rowtime bounds" and "current time
increment" events). All three of these give a way to say "OK, it's
time to emit the 9:00-10:00 sub-total", which is how the system makes
progress. The latter two are looser than monotonicity, but tend
towards monotonicity in the limit as we reduce k to zero, or increase
the frequency of punctuation events.

Algebraic reasoning based on monotonicity can be extended to the other
models. If we start with the more complex models we'd soon we up to
our hubcaps in theoretical mud.

Julian

[1] http://ilpubs.stanford.edu:8090/560/1/2002-52.pdf

[2] http://db.cs.berkeley.edu/cs286/papers/punctuations-tkde2003.pdf

On Fri, Jun 26, 2015 at 5:54 PM, Ted Dunning <te...@gmail.com> wrote:
> I think that you are headed for trouble with this approach due to the fact
> that in real life data, the "monotonic" field often isn't.  This happens
> because it is important to assign time stamps in a distributed fashion but
> records are then subjected to variable transmission delays.
>
>
>
> On Fri, Jun 26, 2015 at 6:53 PM, Milinda Pathirage <mp...@umail.iu.edu>
> wrote:
>
>> Thanks Julian. I'll give it a try.
>>
>> Milinda
>>
>> On Fri, Jun 26, 2015 at 6:14 PM, Julian Hyde <jh...@apache.org> wrote:
>>
>> > RelNodes know their sort order: call
>> > rel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE). That said,
>> > in non-streaming planning we start with unsorted relational
>> > expressions and deduce sort order later if it is beneficial (e.g. if
>> > it allows a sort-merge join) and it seems that we're currently doing
>> > that for streaming planning.
>> >
>> > To see what I'm talking about, run StreamTest.testStreamGroupByHaving
>> > and put a break point in LogicalAggregate's constructor. It's traitSet
>> > contains only the empty collation. We need to fix that.
>> >
>> > Julian
>> >
>> >
>> > On Fri, Jun 26, 2015 at 11:40 AM, Milinda Pathirage
>> > <mp...@umail.iu.edu> wrote:
>> > > Hi Yi,
>> > >
>> > > In this specific case ordering is declared in the schema. Quoting from
>> > > Calcite documentation
>> > >
>> > > ~~~~~~~~~~~~~~~~~~~~
>> > > Monotonic columns need to be declared in the schema. The monotonicity
>> is
>> > > enforced when records enter the stream and assumed by queries that read
>> > > from that stream. We recommend that you give each stream a timestamp
>> > column
>> > > called rowtime, but you can declare others, orderId, for example.
>> > > ~~~~~~~~~~~~~~~~~~~~
>> > >
>> > >
>> > >
>> > > If we can propagate this ordering information to LogicalAggregate then
>> we
>> > > can easily handle this. As I understand required information is
>> > accessible
>> > > to Calcite query planner. But in our case we need this information
>> after
>> > we
>> > > get the query plan from Calcite. AFAIK, current API doesn't provide a
>> way
>> > > to get this information in scenarios like above where ORDER BY is not
>> > > specified in the query (I am not 100% sure about ORDER BY case too. I
>> > need
>> > > to have a look at a query plan generated for a query with ORDER BY).
>> > >
>> > > Thanks
>> > > Milinda
>> > >
>> > > On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <ni...@gmail.com> wrote:
>> > >
>> > >> Hi, Milinda,
>> > >>
>> > >> I thought that in your example, the ordering field is given in GROUP
>> BY.
>> > >> Are we missing a way to pass the ordering field(s) to the
>> > LogicalAggregate?
>> > >>
>> > >> -Yi
>> > >>
>> > >> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <
>> > mpathira@umail.iu.edu
>> > >> >
>> > >> wrote:
>> > >>
>> > >> > Hi Julian,
>> > >> >
>> > >> > Even though this is a general question across all the streaming
>> > >> aggregates
>> > >> > which utilize GROUP BY clause and a monotonic timestamp field for
>> > >> > specifying the window, but I am going to stick to most basic example
>> > >> (which
>> > >> > is from Calcite Streaming document).
>> > >> >
>> > >> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
>> > >> >   productId,
>> > >> >   COUNT(*) AS c,
>> > >> >   SUM(units) AS units
>> > >> > FROM Orders
>> > >> > GROUP BY FLOOR(rowtime TO HOUR), productId;
>> > >> >
>> > >> > I was trying to implement an aggregate operator which handles
>> tumbling
>> > >> > windows via the monotonic field in GROUP By clause in addition to
>> the
>> > >> > general aggregations. I went in this path because I thought
>> > integrating
>> > >> > windowing aspects (at least for tumbling and hopping) into aggregate
>> > >> > operator will be easier than trying to extract the window spec from
>> > the
>> > >> > query plan for a query like above. But I hit a wall when trying to
>> > figure
>> > >> > out trigger condition for emitting aggregate results. I was
>> initially
>> > >> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit
>> > current
>> > >> > aggregate results for previous groups (I was thinking to keep old
>> > groups
>> > >> > around until we clean them up after a timeout). But when trying to
>> > >> > implement this I figured out that I don’t know how to check which
>> > GROUP
>> > >> BY
>> > >> > field is monotonic so that I only detect new values for the
>> monotonic
>> > >> > field/fields, not for the all the other fields. I think this is not
>> a
>> > >> > problem for tables because we have the whole input before
>> computation
>> > and
>> > >> > we wait till we are done with the input before emitting the results.
>> > >> >
>> > >> > With regards to above can you please clarify following things:
>> > >> >
>> > >> > - Is the method I described above for handling streaming aggregates
>> > make
>> > >> > sense at all?
>> > >> > - Is there a way that I can figure out which fields/expressions in
>> > >> > LogicalAggregate are monotonic?
>> > >> > - Or can we write a rule to annotate or add extra metadata to
>> > >> > LogicalAggregate so that we can get monotonic fields in the GROUP By
>> > >> clause
>> > >> >
>> > >> > Thanks in advance
>> > >> > Milinda
>> > >> >
>> > >> >
>> > >> > --
>> > >> > Milinda Pathirage
>> > >> >
>> > >> > PhD Student | Research Assistant
>> > >> > School of Informatics and Computing | Data to Insight Center
>> > >> > Indiana University
>> > >> >
>> > >> > twitter: milindalakmal
>> > >> > skype: milinda.pathirage
>> > >> > blog: http://milinda.pathirage.org
>> > >> >
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > Milinda Pathirage
>> > >
>> > > PhD Student | Research Assistant
>> > > School of Informatics and Computing | Data to Insight Center
>> > > Indiana University
>> > >
>> > > twitter: milindalakmal
>> > > skype: milinda.pathirage
>> > > blog: http://milinda.pathirage.org
>> >
>>
>>
>>
>> --
>> Milinda Pathirage
>>
>> PhD Student | Research Assistant
>> School of Informatics and Computing | Data to Insight Center
>> Indiana University
>>
>> twitter: milindalakmal
>> skype: milinda.pathirage
>> blog: http://milinda.pathirage.org
>>

Re: Triggering emits for streaming window aggregates

Posted by Ted Dunning <te...@gmail.com>.
I think that you are headed for trouble with this approach due to the fact
that in real life data, the "monotonic" field often isn't.  This happens
because it is important to assign time stamps in a distributed fashion but
records are then subjected to variable transmission delays.



On Fri, Jun 26, 2015 at 6:53 PM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Thanks Julian. I'll give it a try.
>
> Milinda
>
> On Fri, Jun 26, 2015 at 6:14 PM, Julian Hyde <jh...@apache.org> wrote:
>
> > RelNodes know their sort order: call
> > rel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE). That said,
> > in non-streaming planning we start with unsorted relational
> > expressions and deduce sort order later if it is beneficial (e.g. if
> > it allows a sort-merge join) and it seems that we're currently doing
> > that for streaming planning.
> >
> > To see what I'm talking about, run StreamTest.testStreamGroupByHaving
> > and put a break point in LogicalAggregate's constructor. It's traitSet
> > contains only the empty collation. We need to fix that.
> >
> > Julian
> >
> >
> > On Fri, Jun 26, 2015 at 11:40 AM, Milinda Pathirage
> > <mp...@umail.iu.edu> wrote:
> > > Hi Yi,
> > >
> > > In this specific case ordering is declared in the schema. Quoting from
> > > Calcite documentation
> > >
> > > ~~~~~~~~~~~~~~~~~~~~
> > > Monotonic columns need to be declared in the schema. The monotonicity
> is
> > > enforced when records enter the stream and assumed by queries that read
> > > from that stream. We recommend that you give each stream a timestamp
> > column
> > > called rowtime, but you can declare others, orderId, for example.
> > > ~~~~~~~~~~~~~~~~~~~~
> > >
> > >
> > >
> > > If we can propagate this ordering information to LogicalAggregate then
> we
> > > can easily handle this. As I understand required information is
> > accessible
> > > to Calcite query planner. But in our case we need this information
> after
> > we
> > > get the query plan from Calcite. AFAIK, current API doesn't provide a
> way
> > > to get this information in scenarios like above where ORDER BY is not
> > > specified in the query (I am not 100% sure about ORDER BY case too. I
> > need
> > > to have a look at a query plan generated for a query with ORDER BY).
> > >
> > > Thanks
> > > Milinda
> > >
> > > On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <ni...@gmail.com> wrote:
> > >
> > >> Hi, Milinda,
> > >>
> > >> I thought that in your example, the ordering field is given in GROUP
> BY.
> > >> Are we missing a way to pass the ordering field(s) to the
> > LogicalAggregate?
> > >>
> > >> -Yi
> > >>
> > >> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <
> > mpathira@umail.iu.edu
> > >> >
> > >> wrote:
> > >>
> > >> > Hi Julian,
> > >> >
> > >> > Even though this is a general question across all the streaming
> > >> aggregates
> > >> > which utilize GROUP BY clause and a monotonic timestamp field for
> > >> > specifying the window, but I am going to stick to most basic example
> > >> (which
> > >> > is from Calcite Streaming document).
> > >> >
> > >> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
> > >> >   productId,
> > >> >   COUNT(*) AS c,
> > >> >   SUM(units) AS units
> > >> > FROM Orders
> > >> > GROUP BY FLOOR(rowtime TO HOUR), productId;
> > >> >
> > >> > I was trying to implement an aggregate operator which handles
> tumbling
> > >> > windows via the monotonic field in GROUP By clause in addition to
> the
> > >> > general aggregations. I went in this path because I thought
> > integrating
> > >> > windowing aspects (at least for tumbling and hopping) into aggregate
> > >> > operator will be easier than trying to extract the window spec from
> > the
> > >> > query plan for a query like above. But I hit a wall when trying to
> > figure
> > >> > out trigger condition for emitting aggregate results. I was
> initially
> > >> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit
> > current
> > >> > aggregate results for previous groups (I was thinking to keep old
> > groups
> > >> > around until we clean them up after a timeout). But when trying to
> > >> > implement this I figured out that I don’t know how to check which
> > GROUP
> > >> BY
> > >> > field is monotonic so that I only detect new values for the
> monotonic
> > >> > field/fields, not for the all the other fields. I think this is not
> a
> > >> > problem for tables because we have the whole input before
> computation
> > and
> > >> > we wait till we are done with the input before emitting the results.
> > >> >
> > >> > With regards to above can you please clarify following things:
> > >> >
> > >> > - Is the method I described above for handling streaming aggregates
> > make
> > >> > sense at all?
> > >> > - Is there a way that I can figure out which fields/expressions in
> > >> > LogicalAggregate are monotonic?
> > >> > - Or can we write a rule to annotate or add extra metadata to
> > >> > LogicalAggregate so that we can get monotonic fields in the GROUP By
> > >> clause
> > >> >
> > >> > Thanks in advance
> > >> > Milinda
> > >> >
> > >> >
> > >> > --
> > >> > Milinda Pathirage
> > >> >
> > >> > PhD Student | Research Assistant
> > >> > School of Informatics and Computing | Data to Insight Center
> > >> > Indiana University
> > >> >
> > >> > twitter: milindalakmal
> > >> > skype: milinda.pathirage
> > >> > blog: http://milinda.pathirage.org
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > Milinda Pathirage
> > >
> > > PhD Student | Research Assistant
> > > School of Informatics and Computing | Data to Insight Center
> > > Indiana University
> > >
> > > twitter: milindalakmal
> > > skype: milinda.pathirage
> > > blog: http://milinda.pathirage.org
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Triggering emits for streaming window aggregates

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Thanks Julian. I'll give it a try.

Milinda

On Fri, Jun 26, 2015 at 6:14 PM, Julian Hyde <jh...@apache.org> wrote:

> RelNodes know their sort order: call
> rel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE). That said,
> in non-streaming planning we start with unsorted relational
> expressions and deduce sort order later if it is beneficial (e.g. if
> it allows a sort-merge join) and it seems that we're currently doing
> that for streaming planning.
>
> To see what I'm talking about, run StreamTest.testStreamGroupByHaving
> and put a break point in LogicalAggregate's constructor. It's traitSet
> contains only the empty collation. We need to fix that.
>
> Julian
>
>
> On Fri, Jun 26, 2015 at 11:40 AM, Milinda Pathirage
> <mp...@umail.iu.edu> wrote:
> > Hi Yi,
> >
> > In this specific case ordering is declared in the schema. Quoting from
> > Calcite documentation
> >
> > ~~~~~~~~~~~~~~~~~~~~
> > Monotonic columns need to be declared in the schema. The monotonicity is
> > enforced when records enter the stream and assumed by queries that read
> > from that stream. We recommend that you give each stream a timestamp
> column
> > called rowtime, but you can declare others, orderId, for example.
> > ~~~~~~~~~~~~~~~~~~~~
> >
> >
> >
> > If we can propagate this ordering information to LogicalAggregate then we
> > can easily handle this. As I understand required information is
> accessible
> > to Calcite query planner. But in our case we need this information after
> we
> > get the query plan from Calcite. AFAIK, current API doesn't provide a way
> > to get this information in scenarios like above where ORDER BY is not
> > specified in the query (I am not 100% sure about ORDER BY case too. I
> need
> > to have a look at a query plan generated for a query with ORDER BY).
> >
> > Thanks
> > Milinda
> >
> > On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> >> Hi, Milinda,
> >>
> >> I thought that in your example, the ordering field is given in GROUP BY.
> >> Are we missing a way to pass the ordering field(s) to the
> LogicalAggregate?
> >>
> >> -Yi
> >>
> >> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <
> mpathira@umail.iu.edu
> >> >
> >> wrote:
> >>
> >> > Hi Julian,
> >> >
> >> > Even though this is a general question across all the streaming
> >> aggregates
> >> > which utilize GROUP BY clause and a monotonic timestamp field for
> >> > specifying the window, but I am going to stick to most basic example
> >> (which
> >> > is from Calcite Streaming document).
> >> >
> >> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
> >> >   productId,
> >> >   COUNT(*) AS c,
> >> >   SUM(units) AS units
> >> > FROM Orders
> >> > GROUP BY FLOOR(rowtime TO HOUR), productId;
> >> >
> >> > I was trying to implement an aggregate operator which handles tumbling
> >> > windows via the monotonic field in GROUP By clause in addition to the
> >> > general aggregations. I went in this path because I thought
> integrating
> >> > windowing aspects (at least for tumbling and hopping) into aggregate
> >> > operator will be easier than trying to extract the window spec from
> the
> >> > query plan for a query like above. But I hit a wall when trying to
> figure
> >> > out trigger condition for emitting aggregate results. I was initially
> >> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit
> current
> >> > aggregate results for previous groups (I was thinking to keep old
> groups
> >> > around until we clean them up after a timeout). But when trying to
> >> > implement this I figured out that I don’t know how to check which
> GROUP
> >> BY
> >> > field is monotonic so that I only detect new values for the monotonic
> >> > field/fields, not for the all the other fields. I think this is not a
> >> > problem for tables because we have the whole input before computation
> and
> >> > we wait till we are done with the input before emitting the results.
> >> >
> >> > With regards to above can you please clarify following things:
> >> >
> >> > - Is the method I described above for handling streaming aggregates
> make
> >> > sense at all?
> >> > - Is there a way that I can figure out which fields/expressions in
> >> > LogicalAggregate are monotonic?
> >> > - Or can we write a rule to annotate or add extra metadata to
> >> > LogicalAggregate so that we can get monotonic fields in the GROUP By
> >> clause
> >> >
> >> > Thanks in advance
> >> > Milinda
> >> >
> >> >
> >> > --
> >> > Milinda Pathirage
> >> >
> >> > PhD Student | Research Assistant
> >> > School of Informatics and Computing | Data to Insight Center
> >> > Indiana University
> >> >
> >> > twitter: milindalakmal
> >> > skype: milinda.pathirage
> >> > blog: http://milinda.pathirage.org
> >> >
> >>
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Julian Hyde <jh...@apache.org>.
RelNodes know their sort order: call
rel.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE). That said,
in non-streaming planning we start with unsorted relational
expressions and deduce sort order later if it is beneficial (e.g. if
it allows a sort-merge join) and it seems that we're currently doing
that for streaming planning.

To see what I'm talking about, run StreamTest.testStreamGroupByHaving
and put a break point in LogicalAggregate's constructor. It's traitSet
contains only the empty collation. We need to fix that.

Julian


On Fri, Jun 26, 2015 at 11:40 AM, Milinda Pathirage
<mp...@umail.iu.edu> wrote:
> Hi Yi,
>
> In this specific case ordering is declared in the schema. Quoting from
> Calcite documentation
>
> ~~~~~~~~~~~~~~~~~~~~
> Monotonic columns need to be declared in the schema. The monotonicity is
> enforced when records enter the stream and assumed by queries that read
> from that stream. We recommend that you give each stream a timestamp column
> called rowtime, but you can declare others, orderId, for example.
> ~~~~~~~~~~~~~~~~~~~~
>
>
>
> If we can propagate this ordering information to LogicalAggregate then we
> can easily handle this. As I understand required information is accessible
> to Calcite query planner. But in our case we need this information after we
> get the query plan from Calcite. AFAIK, current API doesn't provide a way
> to get this information in scenarios like above where ORDER BY is not
> specified in the query (I am not 100% sure about ORDER BY case too. I need
> to have a look at a query plan generated for a query with ORDER BY).
>
> Thanks
> Milinda
>
> On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <ni...@gmail.com> wrote:
>
>> Hi, Milinda,
>>
>> I thought that in your example, the ordering field is given in GROUP BY.
>> Are we missing a way to pass the ordering field(s) to the LogicalAggregate?
>>
>> -Yi
>>
>> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <mpathira@umail.iu.edu
>> >
>> wrote:
>>
>> > Hi Julian,
>> >
>> > Even though this is a general question across all the streaming
>> aggregates
>> > which utilize GROUP BY clause and a monotonic timestamp field for
>> > specifying the window, but I am going to stick to most basic example
>> (which
>> > is from Calcite Streaming document).
>> >
>> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
>> >   productId,
>> >   COUNT(*) AS c,
>> >   SUM(units) AS units
>> > FROM Orders
>> > GROUP BY FLOOR(rowtime TO HOUR), productId;
>> >
>> > I was trying to implement an aggregate operator which handles tumbling
>> > windows via the monotonic field in GROUP By clause in addition to the
>> > general aggregations. I went in this path because I thought integrating
>> > windowing aspects (at least for tumbling and hopping) into aggregate
>> > operator will be easier than trying to extract the window spec from the
>> > query plan for a query like above. But I hit a wall when trying to figure
>> > out trigger condition for emitting aggregate results. I was initially
>> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
>> > aggregate results for previous groups (I was thinking to keep old groups
>> > around until we clean them up after a timeout). But when trying to
>> > implement this I figured out that I don’t know how to check which GROUP
>> BY
>> > field is monotonic so that I only detect new values for the monotonic
>> > field/fields, not for the all the other fields. I think this is not a
>> > problem for tables because we have the whole input before computation and
>> > we wait till we are done with the input before emitting the results.
>> >
>> > With regards to above can you please clarify following things:
>> >
>> > - Is the method I described above for handling streaming aggregates make
>> > sense at all?
>> > - Is there a way that I can figure out which fields/expressions in
>> > LogicalAggregate are monotonic?
>> > - Or can we write a rule to annotate or add extra metadata to
>> > LogicalAggregate so that we can get monotonic fields in the GROUP By
>> clause
>> >
>> > Thanks in advance
>> > Milinda
>> >
>> >
>> > --
>> > Milinda Pathirage
>> >
>> > PhD Student | Research Assistant
>> > School of Informatics and Computing | Data to Insight Center
>> > Indiana University
>> >
>> > twitter: milindalakmal
>> > skype: milinda.pathirage
>> > blog: http://milinda.pathirage.org
>> >
>>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Yi,

In this specific case ordering is declared in the schema. Quoting from
Calcite documentation

~~~~~~~~~~~~~~~~~~~~
Monotonic columns need to be declared in the schema. The monotonicity is
enforced when records enter the stream and assumed by queries that read
from that stream. We recommend that you give each stream a timestamp column
called rowtime, but you can declare others, orderId, for example.
~~~~~~~~~~~~~~~~~~~~



If we can propagate this ordering information to LogicalAggregate then we
can easily handle this. As I understand required information is accessible
to Calcite query planner. But in our case we need this information after we
get the query plan from Calcite. AFAIK, current API doesn't provide a way
to get this information in scenarios like above where ORDER BY is not
specified in the query (I am not 100% sure about ORDER BY case too. I need
to have a look at a query plan generated for a query with ORDER BY).

Thanks
Milinda

On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Milinda,
>
> I thought that in your example, the ordering field is given in GROUP BY.
> Are we missing a way to pass the ordering field(s) to the LogicalAggregate?
>
> -Yi
>
> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <mpathira@umail.iu.edu
> >
> wrote:
>
> > Hi Julian,
> >
> > Even though this is a general question across all the streaming
> aggregates
> > which utilize GROUP BY clause and a monotonic timestamp field for
> > specifying the window, but I am going to stick to most basic example
> (which
> > is from Calcite Streaming document).
> >
> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
> >   productId,
> >   COUNT(*) AS c,
> >   SUM(units) AS units
> > FROM Orders
> > GROUP BY FLOOR(rowtime TO HOUR), productId;
> >
> > I was trying to implement an aggregate operator which handles tumbling
> > windows via the monotonic field in GROUP By clause in addition to the
> > general aggregations. I went in this path because I thought integrating
> > windowing aspects (at least for tumbling and hopping) into aggregate
> > operator will be easier than trying to extract the window spec from the
> > query plan for a query like above. But I hit a wall when trying to figure
> > out trigger condition for emitting aggregate results. I was initially
> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
> > aggregate results for previous groups (I was thinking to keep old groups
> > around until we clean them up after a timeout). But when trying to
> > implement this I figured out that I don’t know how to check which GROUP
> BY
> > field is monotonic so that I only detect new values for the monotonic
> > field/fields, not for the all the other fields. I think this is not a
> > problem for tables because we have the whole input before computation and
> > we wait till we are done with the input before emitting the results.
> >
> > With regards to above can you please clarify following things:
> >
> > - Is the method I described above for handling streaming aggregates make
> > sense at all?
> > - Is there a way that I can figure out which fields/expressions in
> > LogicalAggregate are monotonic?
> > - Or can we write a rule to annotate or add extra metadata to
> > LogicalAggregate so that we can get monotonic fields in the GROUP By
> clause
> >
> > Thanks in advance
> > Milinda
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Yi,

In this specific case ordering is declared in the schema. Quoting from
Calcite documentation

~~~~~~~~~~~~~~~~~~~~
Monotonic columns need to be declared in the schema. The monotonicity is
enforced when records enter the stream and assumed by queries that read
from that stream. We recommend that you give each stream a timestamp column
called rowtime, but you can declare others, orderId, for example.
~~~~~~~~~~~~~~~~~~~~



If we can propagate this ordering information to LogicalAggregate then we
can easily handle this. As I understand required information is accessible
to Calcite query planner. But in our case we need this information after we
get the query plan from Calcite. AFAIK, current API doesn't provide a way
to get this information in scenarios like above where ORDER BY is not
specified in the query (I am not 100% sure about ORDER BY case too. I need
to have a look at a query plan generated for a query with ORDER BY).

Thanks
Milinda

On Fri, Jun 26, 2015 at 2:30 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Milinda,
>
> I thought that in your example, the ordering field is given in GROUP BY.
> Are we missing a way to pass the ordering field(s) to the LogicalAggregate?
>
> -Yi
>
> On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <mpathira@umail.iu.edu
> >
> wrote:
>
> > Hi Julian,
> >
> > Even though this is a general question across all the streaming
> aggregates
> > which utilize GROUP BY clause and a monotonic timestamp field for
> > specifying the window, but I am going to stick to most basic example
> (which
> > is from Calcite Streaming document).
> >
> > SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
> >   productId,
> >   COUNT(*) AS c,
> >   SUM(units) AS units
> > FROM Orders
> > GROUP BY FLOOR(rowtime TO HOUR), productId;
> >
> > I was trying to implement an aggregate operator which handles tumbling
> > windows via the monotonic field in GROUP By clause in addition to the
> > general aggregations. I went in this path because I thought integrating
> > windowing aspects (at least for tumbling and hopping) into aggregate
> > operator will be easier than trying to extract the window spec from the
> > query plan for a query like above. But I hit a wall when trying to figure
> > out trigger condition for emitting aggregate results. I was initially
> > planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
> > aggregate results for previous groups (I was thinking to keep old groups
> > around until we clean them up after a timeout). But when trying to
> > implement this I figured out that I don’t know how to check which GROUP
> BY
> > field is monotonic so that I only detect new values for the monotonic
> > field/fields, not for the all the other fields. I think this is not a
> > problem for tables because we have the whole input before computation and
> > we wait till we are done with the input before emitting the results.
> >
> > With regards to above can you please clarify following things:
> >
> > - Is the method I described above for handling streaming aggregates make
> > sense at all?
> > - Is there a way that I can figure out which fields/expressions in
> > LogicalAggregate are monotonic?
> > - Or can we write a rule to annotate or add extra metadata to
> > LogicalAggregate so that we can get monotonic fields in the GROUP By
> clause
> >
> > Thanks in advance
> > Milinda
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Triggering emits for streaming window aggregates

Posted by Yi Pan <ni...@gmail.com>.
Hi, Milinda,

I thought that in your example, the ordering field is given in GROUP BY.
Are we missing a way to pass the ordering field(s) to the LogicalAggregate?

-Yi

On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi Julian,
>
> Even though this is a general question across all the streaming aggregates
> which utilize GROUP BY clause and a monotonic timestamp field for
> specifying the window, but I am going to stick to most basic example (which
> is from Calcite Streaming document).
>
> SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
>   productId,
>   COUNT(*) AS c,
>   SUM(units) AS units
> FROM Orders
> GROUP BY FLOOR(rowtime TO HOUR), productId;
>
> I was trying to implement an aggregate operator which handles tumbling
> windows via the monotonic field in GROUP By clause in addition to the
> general aggregations. I went in this path because I thought integrating
> windowing aspects (at least for tumbling and hopping) into aggregate
> operator will be easier than trying to extract the window spec from the
> query plan for a query like above. But I hit a wall when trying to figure
> out trigger condition for emitting aggregate results. I was initially
> planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
> aggregate results for previous groups (I was thinking to keep old groups
> around until we clean them up after a timeout). But when trying to
> implement this I figured out that I don’t know how to check which GROUP BY
> field is monotonic so that I only detect new values for the monotonic
> field/fields, not for the all the other fields. I think this is not a
> problem for tables because we have the whole input before computation and
> we wait till we are done with the input before emitting the results.
>
> With regards to above can you please clarify following things:
>
> - Is the method I described above for handling streaming aggregates make
> sense at all?
> - Is there a way that I can figure out which fields/expressions in
> LogicalAggregate are monotonic?
> - Or can we write a rule to annotate or add extra metadata to
> LogicalAggregate so that we can get monotonic fields in the GROUP By clause
>
> Thanks in advance
> Milinda
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Triggering emits for streaming window aggregates

Posted by Yi Pan <ni...@gmail.com>.
Hi, Milinda,

I thought that in your example, the ordering field is given in GROUP BY.
Are we missing a way to pass the ordering field(s) to the LogicalAggregate?

-Yi

On Fri, Jun 26, 2015 at 10:49 AM, Milinda Pathirage <mp...@umail.iu.edu>
wrote:

> Hi Julian,
>
> Even though this is a general question across all the streaming aggregates
> which utilize GROUP BY clause and a monotonic timestamp field for
> specifying the window, but I am going to stick to most basic example (which
> is from Calcite Streaming document).
>
> SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,
>   productId,
>   COUNT(*) AS c,
>   SUM(units) AS units
> FROM Orders
> GROUP BY FLOOR(rowtime TO HOUR), productId;
>
> I was trying to implement an aggregate operator which handles tumbling
> windows via the monotonic field in GROUP By clause in addition to the
> general aggregations. I went in this path because I thought integrating
> windowing aspects (at least for tumbling and hopping) into aggregate
> operator will be easier than trying to extract the window spec from the
> query plan for a query like above. But I hit a wall when trying to figure
> out trigger condition for emitting aggregate results. I was initially
> planning to detect new values for FLOOR(rowtime TO HOUR) and emit current
> aggregate results for previous groups (I was thinking to keep old groups
> around until we clean them up after a timeout). But when trying to
> implement this I figured out that I don’t know how to check which GROUP BY
> field is monotonic so that I only detect new values for the monotonic
> field/fields, not for the all the other fields. I think this is not a
> problem for tables because we have the whole input before computation and
> we wait till we are done with the input before emitting the results.
>
> With regards to above can you please clarify following things:
>
> - Is the method I described above for handling streaming aggregates make
> sense at all?
> - Is there a way that I can figure out which fields/expressions in
> LogicalAggregate are monotonic?
> - Or can we write a rule to annotate or add extra metadata to
> LogicalAggregate so that we can get monotonic fields in the GROUP By clause
>
> Thanks in advance
> Milinda
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>