You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jark Wu <im...@gmail.com> on 2021/01/25 06:02:43 UTC

[DISCUSS] FLINK-21109: Introduce "retractAccumulators" interface for AggregateFunction in Table/SQL API

Hi all,

I would like to propose introducing a new method "retractAccumulators()" to
the `AggregateFunction` in Table/SQL.

*Motivation*

The motivation is to improve the performance of hopping (sliding) windows.
Currently, we have paned (or called sliced) optimization for the hopping
windows in Table/SQL.
That each element will only be accumulated into a single pane. And once a
window is fired,
we will merge multiple panes to get the window result.

For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes
[0, 2), [2, 4), [4, 6), [6, 8), [8, 10).
And each element will fall into a single pane, e.g. element with timestamp
3 will fall into pane [2, 4).

However, currently, the merging panes happen on JVM heap memory. For
example, when window [0, 10) is going to be fired,
we will retrieve the accumulators of the 5 panes and merge them into an
in-memory accumulator.
The performance is not good, because the number of panes may be very large
when the slide is small, e.g. 8640 panes when HOP(1day, 10s).
And the memory may OOM when the accumulator is very large, e.g. count
distinct.

Thus, I would like to introduce a "retractAccumulators()" method which is
an inverse method of "merge()".
With the "retractAccumulators()" method, we can reduce the time complexity
from O(N) to O(1).
For example, when window [10, 20) is going to be fired, then we only need
to retract accumulator of pane [8, 10)
and merge the accumulator of pane [18, 20) into the state of the last
window [8, 18).

This will be a great performance improvement to make the hopping window
have similar performance
with the tumbling window, no matter how small the slide is.
And we can avoid OOM, because the merged acc is on state instead of
in-memory.

*Public Interface*

We will introduce a contract method "retractAccumulators" which is similar
to the "merge" method.

Retracts a group of accumulator instances from one accumulator instance.
This method is optional,
but implementing this method can greatly improve the performance of hopping
window aggregates.
Therefore, it is recommended to implement this method when using with
hopping windows.

param: accumulator the accumulator which will keep the retracted aggregate
results. It should
                   be noted that the accumulator may contain the previous
aggregated
                   results. Therefore users should not replace or clean
this instance in the
                   custom retractAccumulators method.
param: retractAccs an java.lang.Iterable pointed to a group of accumulators
that will be
                   retracted.

public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC>
retractAccs)


What do you think?

Best,
Jark

Re: [DISCUSS] FLINK-21109: Introduce "retractAccumulators" interface for AggregateFunction in Table/SQL API

Posted by "DONG, Weike" <ky...@connect.hku.hk>.
Hi Jark,

+1 for this proposal. Glad to see more details : )

Sincerely,
Weike

On Mon, Jan 25, 2021 at 2:03 PM Jark Wu <im...@gmail.com> wrote:

> Hi all,
>
> I would like to propose introducing a new method "retractAccumulators()" to
> the `AggregateFunction` in Table/SQL.
>
> *Motivation*
>
> The motivation is to improve the performance of hopping (sliding) windows.
> Currently, we have paned (or called sliced) optimization for the hopping
> windows in Table/SQL.
> That each element will only be accumulated into a single pane. And once a
> window is fired,
> we will merge multiple panes to get the window result.
>
> For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes
> [0, 2), [2, 4), [4, 6), [6, 8), [8, 10).
> And each element will fall into a single pane, e.g. element with timestamp
> 3 will fall into pane [2, 4).
>
> However, currently, the merging panes happen on JVM heap memory. For
> example, when window [0, 10) is going to be fired,
> we will retrieve the accumulators of the 5 panes and merge them into an
> in-memory accumulator.
> The performance is not good, because the number of panes may be very large
> when the slide is small, e.g. 8640 panes when HOP(1day, 10s).
> And the memory may OOM when the accumulator is very large, e.g. count
> distinct.
>
> Thus, I would like to introduce a "retractAccumulators()" method which is
> an inverse method of "merge()".
> With the "retractAccumulators()" method, we can reduce the time complexity
> from O(N) to O(1).
> For example, when window [10, 20) is going to be fired, then we only need
> to retract accumulator of pane [8, 10)
> and merge the accumulator of pane [18, 20) into the state of the last
> window [8, 18).
>
> This will be a great performance improvement to make the hopping window
> have similar performance
> with the tumbling window, no matter how small the slide is.
> And we can avoid OOM, because the merged acc is on state instead of
> in-memory.
>
> *Public Interface*
>
> We will introduce a contract method "retractAccumulators" which is similar
> to the "merge" method.
>
> Retracts a group of accumulator instances from one accumulator instance.
> This method is optional,
> but implementing this method can greatly improve the performance of hopping
> window aggregates.
> Therefore, it is recommended to implement this method when using with
> hopping windows.
>
> param: accumulator the accumulator which will keep the retracted aggregate
> results. It should
>                    be noted that the accumulator may contain the previous
> aggregated
>                    results. Therefore users should not replace or clean
> this instance in the
>                    custom retractAccumulators method.
> param: retractAccs an java.lang.Iterable pointed to a group of accumulators
> that will be
>                    retracted.
>
> public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC>
> retractAccs)
>
>
> What do you think?
>
> Best,
> Jark
>