You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/01/04 22:39:00 UTC
[jira] [Updated] (FLINK-6082) Support window definition for SQL Queries based on WHERE clause with time condition
[ https://issues.apache.org/jira/browse/FLINK-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-6082:
----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor (was: auto-deprioritized-major stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion.
> Support window definition for SQL Queries based on WHERE clause with time condition
> -----------------------------------------------------------------------------------
>
> Key: FLINK-6082
> URL: https://issues.apache.org/jira/browse/FLINK-6082
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Reporter: radu
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Time target: Proc Time
> Calcite documentation refers to query examples where the (time)
> boundaries are defined as condition within the WHERE clause. As Flink
> community targets compatibility with Calcite, it makes sense to support
> the definition of windows via this method as well as corresponding
> aggregation on top of them.
> SQL targeted query examples:
> ----------------------------
> ```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ timestamp - INTERVAL '1' HOUR AND current\_timestamp```
> General comment:
> 1) window boundaries are defined as conditions in WHERE clause.
> 2) For indicating the usage of different stream times, rowtime and
> proctime can be used
> 3) The boundaries are defined based on special construct provided by
> calcite: current\_timestamp and time operations
> Description:
> ------------
> The logic of this operator is strictly related to supporting aggregates
> over sliding windows defined with OVER
> ([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
> [FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
> [FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
> issue the design considered queries where the window is defined with the
> syntax of OVER clause and aggregates are applied over this period. This
> is similar in behavior with the only exception that the window
> boundaries are defined with respect to the WHERE conditions. Besides
> this the logic and the types of aggregates to be supported should be the
> same (sum, count, avg, min, max). Supporting these types of query is
> related to the pie chart problem tackled by calcite.
> Similar as for the OVER windows, the construct should build rolling
> windows (i.e., windows that are triggered and move with every incoming
> event).
> Functionality example
> ---------------------
> We exemplify below the functionality of the IN/Exists when working with
> streams.
> `SELECT a, count( * ) FROM stream1 WHERE proctime BETWEEN current_ timestamp - INTERVAL '1' HOUR AND current_timestamp;`
> ||IngestionTime(Event)|| Stream1|| Output||
> |10:00:01 |Id1,10 |Id1,1|
> |10:02:00 |Id2,2 |Id2,2|
> |11:25:00 |Id3,2 |Id3,1|
> |12:03:00 |Id4,15 |Id4,2|
> |12:05:00 |Id5,11 |Id5,3|
> |12:56:00 |Id6,20 |Id6,3|
> |...|
> Implementation option
> ---------------------
> Considering that the query follows the same functionality as for the
> aggregates over window, the implementation should follow the same
> implementation as for the OVER clause. Considering that the WHERE
> condition are typically related to timing, this means that in case of
> one unbound boundary the
> [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
> used, while for bounded time windows the
> [FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
> should be used.
> The window boundaries will be extracted from the WHERE condition.
> The rule will not be mapped anymore to a LogicalWindow, which means that
> the conversion to this would need to happen from the current
> DataStreamCalc rule. In this sense, a dedicated condition will be added
> such that in case the WHERE clause has time conditions, the operator
> implementation of the Over clause (used in the previous issues) should
> be used.
> ```
> class DataStreamCalcRule
> -----------------------------------------------------------------------------------------------
> {
> --- -------------------------------------------------------------------------------------------
>
> def convert(rel: RelNode): RelNode = {
> val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]
> val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
> val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE)
>
> IF(WHERE contains TIME limits)
>
> {
>
> > IF(bounded)
> >
> > new DataStreamProcTimeTimeAggregate
> >
> > ELSE
> >
> > new DataStreamSlideEventTimeRowAgg
> >
> > }
> >
>
> Else
>
> **{**
>
> new DataStreamCalc(
> rel.getCluster,
> traitSet,
> convInput,
> rel.getRowType,
> calc.getProgram,
> description)
> }
>
> }
> }
> -----------------------------------------------------------------------------------------------
> ```
--
This message was sent by Atlassian Jira
(v8.20.1#820001)