You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Milinda Pathirage <mp...@umail.iu.edu> on 2015/02/26 20:34:45 UTC

Handling defaults and window aggregates in stream query plans

Hi devs,

Query like following:

'select stream * from orders where quantity > 5'

has the query plan shown below in streaming context.

LogicalDelta
  LogicalProject(id=[$0], product=[$1], quantity=[$2])
    LogicalFilter(condition=[>($2, 5)])
      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])


In Samza, we have a partial physical operator layer implemented based on
CQL (https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf) which requires us
to convert streams to time varying relations based on window operator,
before executing the business logic (aggregates, projects). So for queries
like above where there is no window declarations, we should apply 'now'
window (where window always contain the latest tuple) operator to the
stream.

One way I thought to do this is to rewrite the query plan to introduce a
LogicalNowWindow between filter and stream scan like following:

LogicalDelta
  LogicalProject(id=[$0], product=[$1], quantity=[$2])
    LogicalFilter(condition=[>($2, 5)])
      LogicalNowWindow
        StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])


On the other hand, we have the opportunity to move 'filter' down below the
window operator like following:

LogicalDelta
  LogicalProject(id=[$0], product=[$1], quantity=[$2])
    LogicalNowWindow
      LogicalFilter(condition=[>($2, 5)])
        StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])



But I'm not sure whether this is the best solution. Another thing I found
was, window specification is attached to the LogicalProject when I tried a
windowed aggregation. So moving down the window may be hard. As of my
current understanding, moving the window operation near to scan makes it
easy to implement aggregations in streaming environment. Also in case of
multiple windowed queries over same stream, we may theoretically share the
window operator.

Given above, I would like to know what you think about this. Also I would
like to know whether default operator introduction can happen at query
planning stage instead of rewrite phase (like in Calcite Interpreter) which
happens after query planning.



Thanks
Milinda


-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org