You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2021/10/29 10:42:01 UTC

[jira] [Updated] (FLINK-21109) Introduce "retractAccumulators" interface for AggregateFunction in Table/SQL API

     [ https://issues.apache.org/jira/browse/FLINK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Flink Jira Bot updated FLINK-21109:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized.


> Introduce "retractAccumulators" interface for AggregateFunction in Table/SQL API
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-21109
>                 URL: https://issues.apache.org/jira/browse/FLINK-21109
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>            Reporter: Jark Wu
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> *Motivation*
> The motivation is to improve the performance of hopping (sliding) windows.
> Currently, we have paned (or called sliced) optimization for the hopping windows in Table/SQL. 
> That each element will be accumulated into a single pane. And once a window is fired,
> we will merge multiple panes to get the window result. 
> For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes [0, 2), [2, 4), [4, 6), [6, 8), [8, 10).
> And each element will fall into a single pane, e.g. element with timestamp 3 will fall into pane [2, 4).
> However, currently, the merging panes happen on JVM heap memory. For example, when window [0, 10) is going to be fired,
> we will retrieve the accumulators of the 5 panes and merge them into an in-memory accumulator. 
> The performance is not good, because the number of panes may be very large when the slide is small, e.g. 8640 panes when HOP(1day, 10s).
> And the memory may OOM when the accumulator is very large, e.g. containing count distinct. 
> Thus, I would like to introduce a "retractAccumulators()" method which is an inverse method of "merge()".
> With the "retractAccumulators()" method, we can reduce the time complexity from O(N) to O(1).
> For example, when window [10, 20) is going to be fired, then we only need to retract accumulator of pane [8, 10) 
> and merge the accumulator of pane [18, 20) into the state of the last window [8, 18). 
> This will be a great performance improvement to make the hopping window have similar performance 
> with the tumbling window, no matter how small the slide is. 
> *Public Interface*
> We will introduce a contract method "retractAccumulators" which is similar to the "merge" method.
> {code}
> Retracts a group of accumulator instances from one accumulator instance. This method is optional, 
> but implementing this method can greatly improve the performance of hopping window aggregates.
> Therefore, it is recommended to implement this method when using with hopping windows. 
> param: accumulator the accumulator which will keep the retracted aggregate results. It should
>                    be noted that the accumulator may contain the previous aggregated
>                    results. Therefore users should not replace or clean this instance in the
>                    custom retractAccumulators method.
> param: retractAccs an java.lang.Iterable pointed to a group of accumulators that will be
>                    retracted.
> public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC> retractAccs)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)