You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flink Jira Bot (Jira)" <ji...@apache.org> on 2021/10/29 10:42:01 UTC
[jira] [Updated] (FLINK-21109) Introduce "retractAccumulators"
interface for AggregateFunction in Table/SQL API
[ https://issues.apache.org/jira/browse/FLINK-21109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-21109:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was: auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized.
> Introduce "retractAccumulators" interface for AggregateFunction in Table/SQL API
> --------------------------------------------------------------------------------
>
> Key: FLINK-21109
> URL: https://issues.apache.org/jira/browse/FLINK-21109
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API
> Reporter: Jark Wu
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
>
> *Motivation*
> The motivation is to improve the performance of hopping (sliding) windows.
> Currently, we have paned (or called sliced) optimization for the hopping windows in Table/SQL.
> That each element will be accumulated into a single pane. And once a window is fired,
> we will merge multiple panes to get the window result.
> For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes [0, 2), [2, 4), [4, 6), [6, 8), [8, 10).
> And each element will fall into a single pane, e.g. element with timestamp 3 will fall into pane [2, 4).
> However, currently, the merging panes happen on JVM heap memory. For example, when window [0, 10) is going to be fired,
> we will retrieve the accumulators of the 5 panes and merge them into an in-memory accumulator.
> The performance is not good, because the number of panes may be very large when the slide is small, e.g. 8640 panes when HOP(1day, 10s).
> And the memory may OOM when the accumulator is very large, e.g. containing count distinct.
> Thus, I would like to introduce a "retractAccumulators()" method which is an inverse method of "merge()".
> With the "retractAccumulators()" method, we can reduce the time complexity from O(N) to O(1).
> For example, when window [10, 20) is going to be fired, then we only need to retract accumulator of pane [8, 10)
> and merge the accumulator of pane [18, 20) into the state of the last window [8, 18).
> This will be a great performance improvement to make the hopping window have similar performance
> with the tumbling window, no matter how small the slide is.
> *Public Interface*
> We will introduce a contract method "retractAccumulators" which is similar to the "merge" method.
> {code}
> Retracts a group of accumulator instances from one accumulator instance. This method is optional,
> but implementing this method can greatly improve the performance of hopping window aggregates.
> Therefore, it is recommended to implement this method when using with hopping windows.
> param: accumulator the accumulator which will keep the retracted aggregate results. It should
> be noted that the accumulator may contain the previous aggregated
> results. Therefore users should not replace or clean this instance in the
> custom retractAccumulators method.
> param: retractAccs an java.lang.Iterable pointed to a group of accumulators that will be
> retracted.
> public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC> retractAccs)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)