You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/01/04 22:39:00 UTC

[jira] [Updated] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition

     [ https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-6082:
----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion.


> Support window definition for SQL Queries based on WHERE clause with time condition
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-6082
>                 URL: https://issues.apache.org/jira/browse/FLINK-6082
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>            Reporter: radu
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Time target: Proc Time
> Calcite documentation refers to query examples where the (time)
> boundaries are defined as condition within the WHERE clause. As Flink
> community targets compatibility with Calcite, it makes sense to support
> the definition of windows via this method as well as corresponding
> aggregation on top of them.
> SQL targeted query examples:
> ----------------------------
> ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ timestamp - INTERVAL '1' HOUR AND current\_timestamp```
> General comment:
> 1)  window boundaries are defined as conditions in WHERE clause.
> 2)  For indicating the usage of different stream times, rowtime and
>     proctime can be used
> 3)  The boundaries are defined based on special construct provided by
>     calcite: current\_timestamp and time operations
> Description:
> ------------
> The logic of this operator is strictly related to supporting aggregates
> over sliding windows defined with OVER
> ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
> [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
> [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
> issue the design considered queries where the window is defined with the
> syntax of OVER clause and aggregates are applied over this period. This
> is similar in behavior with the only exception that the window
> boundaries are defined with respect to the WHERE conditions. Besides
> this the logic and the types of aggregates to be supported should be the
> same (sum, count, avg, min, max). Supporting these types of query is
> related to the pie chart problem tackled by calcite.
> Similar as for the OVER windows, the construct should build rolling
> windows (i.e., windows that are triggered and move with every incoming
> event).
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL '1' HOUR AND current_timestamp;`
> ||IngestionTime(Event)||	Stream1||	Output||
> |10:00:01	|Id1,10	|Id1,1|
> |10:02:00	|Id2,2	|Id2,2|
> |11:25:00	|Id3,2	|Id3,1|
> |12:03:00	|Id4,15	|Id4,2|
> |12:05:00	|Id5,11	|Id5,3|
> |12:56:00	|Id6,20	|Id6,3|
> |...|
> Implementation option
> ---------------------
> Considering that the query follows the same functionality as for the
> aggregates over window, the implementation should follow the same
> implementation as for the OVER clause. Considering that the WHERE
> condition are typically related to timing, this means that in case of
> one unbound boundary the
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
> used, while for bounded time windows the
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
> should be used.
> The window boundaries will be extracted from the WHERE condition.
> The rule will not be mapped anymore to a LogicalWindow, which means that
> the conversion to this would need to happen from the current
> DataStreamCalc rule. In this sense, a dedicated condition will be added
> such that in case the WHERE clause has time conditions, the operator
> implementation of the Over clause (used in the previous issues) should
> be used.
> ```
> class DataStreamCalcRule
>   -----------------------------------------------------------------------------------------------
>   {
>   --- -------------------------------------------------------------------------------------------
>       
>       def convert(rel: RelNode): RelNode = {
>       val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]
>       val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
>       val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
>       
>       IF(WHERE contains TIME limits)
>       
>       {
>       
>  >   IF(bounded)
> >       
> >       new DataStreamProcTimeTimeAggregate
> >       
> >       ELSE
> >       
> >       new DataStreamSlideEventTimeRowAgg
> >       
> >       }
> >  
>       
>       Else
>       
>       **{**
>       
>       new DataStreamCalc(
>       rel.getCluster,
>       traitSet,
>       convInput,
>       rel.getRowType,
>       calc.getProgram,
>       description)
>       }
>       
>       }
>       }
>   -----------------------------------------------------------------------------------------------
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)