You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2022/01/04 22:39:00 UTC

[jira] [Updated] (FLINK-6081) Offset/Fetch support for SQL Streaming

     [ https://issues.apache.org/jira/browse/FLINK-6081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-6081:
----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor auto-unassigned  (was: auto-deprioritized-major auto-unassigned stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion.


> Offset/Fetch support for SQL Streaming
> --------------------------------------
>
>                 Key: FLINK-6081
>                 URL: https://issues.apache.org/jira/browse/FLINK-6081
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>            Reporter: radu
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, auto-unassigned
>         Attachments: offset.png
>
>
> Time target: Proc Time
> The main scope of Offset/Fetch is for pagination support. In the context
> of streaming Offset and Fetch would make sense within the scope of
> certain window constructs as they refer to buffered data from the stream
> (with a main usage to restrict the output that is shown at a certain
> moment). Therefore they should be applied to the output of the types of
> windows supported by the ORDER BY clauses. Moreover, in accordance to
> the SQL best practices, they can only be used with an ORDER BY clause.
> SQL targeted query examples:
> ----------------------------
> Window defined based on group by clause
> Q1: 
> {code}
> SELECT a ORDER BY b OFFSET n ROWS FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) 
> {code}
> Window defined based on where clause time boundaries
> Q2: 
> {code}
> SELECT a ORDER BY b OFFSET n WHERE procTime() BETWEEN current_timestamp - INTERVAL '1' HOUR AND current_timestamp FROM stream1
> {code}
> ~~Window defined as sliding windows (aggregates) ~~
> Q3: 
> {code}
> SELECT SUM(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING b OFFSET n ROWS) FROM stream1
> {code}
> Comment: Supporting offset over sliding windows (within the window) does
> not make sense because the main scope of OFFSET/FETCH is for pagination
> support. Therefore this functionality example should only be supported in relation to the
> output of a query. Hence, Q3 will not be supported
> The general grammar (Calcite version) for OFFSET/FECTH with available
> parameters is shown below:
> {code}
> Select […]
> [ ORDER BY orderItem [, orderItem ]* ]
> [ OFFSET start { ROW | ROWS } ]
> [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ]
> {code}
> Description
> -----------
> Offset and Fetch are primary used for pagination support (i.e., restrict
> the output that is shown at some point). They were mainly designed to
> support web page display of the contents. Building on this scenario we
> can imagine a similar role for OFFSET and FETCH for streams that would
> display contents via a web page. In such a scenario the number of
> outputs to be displayed would be limited using such operators (probably
> for pagination and aesthetic reasons). However, as for any stream
> application there is a natural evolution in time, the operators output
> should evolve with the update rate of the application. The fact that
> there is an update rate and a collection of events related to a stream
> points to window constructs. Therefore the OFFSET/FETCH functionality
> would be related to the window mechanisms/boundaries defined by the
> query. Hence when the window construct would be re-triggered the output
> would be filtered again from the cardinality point of view based on the
> logic of the OFFSET/FETCH.
> Because of the primary reasons of supporting pagination (and controlling
> the number of outputs) we limit the usage of OFFSET/Fetch for window
> constructs that would be related to the output. Because of this
> supporting those on sliding window with query aggregates (e.g., Q3 query
> example) would not make sense. Additionally there is an implicit need
> for some ordering clause due to the fact that OFFSET and FETCH point to
> ordering positions. That is why these functions would be supported only
> if an ORDER BY clause is present.
> Functionality example
> ---------------------
> We exemplify the usage of OFFSET below using the following query. Event
> schema is in the form (a,b).
> {code}
> SELECT a ORDER BY b OFFSET 2 ROWS FROM stream1 GROUP BY GROUP BY CEIL(proctime TO HOUR)
> {code}
> ||Proctime||	IngestionTime(Event)||	Stream1||	Output||
> | |10:00:01|	(a1, 7)| |	
> | |10:05:00|	(c1, 2)| |	
> | |10:12:00|	(b1,5)| |	
> | |10:50:00|	(d1,2)| |	
> |10-11|		|	|b1,a1|
> | |11:03:00|	(a2,10)|	| 
> |11-12|		|	|nil|
> |...|
> Implementation option
> ---------------------
> There are 2 options to implement the logic of OFFSET/Fetch:
> 1)  Within the logic of the window (i.e. sorting window)
> Similar as for sorting support (ORDER BY clause), considering that the
> SQL operators will be associated with window boundaries, the
> functionality will be implemented within the logic of the window as
> follows. We extract the window boundaries and window type from the query
> logic. These will be used to define the type of the window, triggering
> policy. The logic of the query (i.e., the sorting of the events) will in
> turn be implemented within the window function. In addition to this, the
> logic of for filtering the output based on the cardinality logic of
> OFFSET/FETCH will be added. With this implementation the logic of the
> OFFSET and FETCH is combined with the one of ORDER BY clause. As ORDER
> BY is always required, it does not provide any implementation
> restrictions.
> 1)  Within the logic of a filter/flatMap function with state counter for
>     outputs)
> Instead of adding the logic within the window functions, the filtering
> can be done within a standalone operator that only counts outputs and
> emits the ones that fall within the logic of the OFFSET/FETCH. To
> provide this functionality we need to use a flatMap function in which we
> count the results. The OFFSET/FETCH condition would be transpose into
> the condition of an IF, applied based on the order of the output, to
> emit the output. However, the counter would need to be reset in
> accordance to the triggering of the window, which makes the
> implementation tedious. This is despite the fact that this
> implementation option would directly translate the output filtering
> logic of the operators from relational SQL.
> We recommend option 1 for implementation.
> Therefore for option 1 we reuse entirely the ORDER BY implementation and
> just add:
> 1)  A counter for the indexing the outputs
> 2)  An if condition to emit the output only if the corresponding index
>     counter falls within the scope defined by the OFFSET/FETCH
> !offset.png!
> General logic of Join
> ---------------------
> inputDataStream.window(new \[Slide/Tumble\]\[Time/Count\]Window())
> > //.trigger(new \[Time/Count\]Trigger()) – use default
> >
> > //.evictor(new \[Time/Count\]Evictor()) – use default
> .apply(SortAndCountFilter());



--
This message was sent by Atlassian Jira
(v8.20.1#820001)