You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/07/24 12:17:00 UTC

[jira] [Updated] (FLINK-6473) Add OVER window support for batch tables

     [ https://issues.apache.org/jira/browse/FLINK-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fabian Hueske updated FLINK-6473:
---------------------------------
    Description: 
Add support for OVER windows for batch tables. 

Since OVER windows are supported for streaming tables, this issue is not about the API (which is available) but about adding the execution strategies and translation for OVER windows on batch tables.

The feature could be implemented using the following plans

*UNBOUNDED OVER*

{code}
DataSet[Row] input = ...
DataSet[Row] result = input
  .groupBy(partitionKeys)
  .sortGroup(orderByKeys)
  .reduceGroup(computeAggregates)
{code}

This implementation is quite straightforward because we don't need to retract rows.

*BOUNDED OVER*

A bit more challenging are BOUNDED OVER windows, because we need to retract values from aggregates and we don't want to store rows temporarily on the heap.

{code}
DataSet[Row] input = ...
DataSet[Row] sorted = input
  .partitionByHash(partitionKey)
  .sortPartition(partitionKeys, orderByKeys)
DataSet[Row] result = sorted.coGroup(sorted)
  .where(partitionKey).equalTo(partitionKey)
  .with(computeAggregates)
{code}

With this, the data set should be partitioned and sorted once. The sorted {{DataSet}} would be consumed twice (the optimizer should inject a temp barrier on one of the inputs to avoid a consumption deadlock). The {{CoGroupFunction}} would accumulate new rows into the aggregates from one input and retract them from the other. Since both input streams are properly sorted, this can happen in a zigzag fashion. We need verify that the generated plan is was we want it to be.

  was:
Add support for OVER windows for batch tables. 

Since OVER windows are supported for streaming tables, this issue is not about the API (which is available) but about adding the execution strategies and translation for OVER windows on batch tables.


> Add OVER window support for batch tables
> ----------------------------------------
>
>                 Key: FLINK-6473
>                 URL: https://issues.apache.org/jira/browse/FLINK-6473
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Fabian Hueske
>            Priority: Major
>
> Add support for OVER windows for batch tables. 
> Since OVER windows are supported for streaming tables, this issue is not about the API (which is available) but about adding the execution strategies and translation for OVER windows on batch tables.
> The feature could be implemented using the following plans
> *UNBOUNDED OVER*
> {code}
> DataSet[Row] input = ...
> DataSet[Row] result = input
>   .groupBy(partitionKeys)
>   .sortGroup(orderByKeys)
>   .reduceGroup(computeAggregates)
> {code}
> This implementation is quite straightforward because we don't need to retract rows.
> *BOUNDED OVER*
> A bit more challenging are BOUNDED OVER windows, because we need to retract values from aggregates and we don't want to store rows temporarily on the heap.
> {code}
> DataSet[Row] input = ...
> DataSet[Row] sorted = input
>   .partitionByHash(partitionKey)
>   .sortPartition(partitionKeys, orderByKeys)
> DataSet[Row] result = sorted.coGroup(sorted)
>   .where(partitionKey).equalTo(partitionKey)
>   .with(computeAggregates)
> {code}
> With this, the data set should be partitioned and sorted once. The sorted {{DataSet}} would be consumed twice (the optimizer should inject a temp barrier on one of the inputs to avoid a consumption deadlock). The {{CoGroupFunction}} would accumulate new rows into the aggregates from one input and retract them from the other. Since both input streams are properly sorted, this can happen in a zigzag fashion. We need verify that the generated plan is was we want it to be.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)