You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "radu (JIRA)" <ji...@apache.org> on 2017/02/02 16:43:51 UTC

[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

    [ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15850152#comment-15850152 ] 

radu commented on FLINK-5654:
-----------------------------

There is another decision point to be made. When we do the parsing of the SQL query two options are possible:

1) the entire expression is translated to a LogicalCalc
An example is
rel#8:LogicalCalc.NONE(input=rel#2:Subset#0.NONE,expr#0..5={inputs},expr#6=17:34:58,expr#7=3600000,expr#8=COUNT($t5) OVER (PARTITION BY $t2 ORDER BY $t6 RANGE BETWEEN $t7 PRECEDING AND CURRENT ROW),expr#9=0,expr#10=>($t8, $t9),expr#11=$SUM0($t5) OVER (PARTITION BY $t2 ORDER BY $t6 RANGE BETWEEN $t7 PRECEDING AND CURRENT ROW),expr#12=CAST($t11):DOUBLE,expr#13=null,expr#14=CASE($t10, $t12, $t13),timeevent=$t0,sumB=$t14)

Notice that the logic related to window (boundaries, aggregates,partition over clause...) are Rex objects in the LogicalCalc (e.g. RexOver, RexWindow...)

2) the other option is that the whole logic of the over clause to be mapped directly to a LogicalWindow operator:
rel#14:LogicalWindow.NONE(input=rel#13:Subset#2.NONE,window#0=window(partition {1} order by [] range between $4 PRECEDING and CURRENT ROW aggs [COUNT($2), $SUM0($2)]))

Notice that all the information specific to the window processing is now in available in the LogicalWindow object, while the other information related to projection or calc will have a dedicated LogicalCalc node/operator.

From my point of view it is more elegant and the code is more robust with the second option. This is what i propose to be used (and the option that i selected).
However, based on default rules adopted from Calcite in Flink, the 2 rules that enable to translate a query to the second option are not enabled.
That is why i added the 2 rules to the FlinkRuleSet

ProjectWindowTransposeRule.INSTANCE,
ProjectToWindowRule.INSTANCE,

I will push these modifications together with the rest of the code. However, if anyone has a strong preference for one option versus the other, or there is a specific reason that i am missing for which the two rules were not enabled in the first place, please let me know

> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> ---------------------------------------------------------------------
>
>                 Key: FLINK-5654
>                 URL: https://issues.apache.org/jira/browse/FLINK-5654
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)