You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by gil bl <gi...@yandex.com> on 2019/09/02 14:46:21 UTC

Window metadata removal

Hi,

I'm interested in why metadata like WindowOperator and InternalTimer are being
kept for windowSize + allowedLateness period per each pane.

  * What is the purpose of keeping this data if no new events are expected to enter the pane? 
  * Is there any way this metadata can be released earlier?


Re: Window metadata removal

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Oh, now I understand your problem.
I dont' think that Flink is able to remove the metadata early. The
implementation is designed for the general case which needs to support the
case where the window data is not purged.
Something that might work is to not configure the window operator with
allowed lateness (hence dropping all late records).
Instead you duplicate the stream before the window operator and have
another operator (based on a ProcessFunction) that drops all "in-time" data
and only forwards data that is at most 7 days old.

Alternatively, you can of course also scale out the program to more
machines to add more memory.

Best,
Fabian

Am Mi., 18. Sept. 2019 um 08:39 Uhr schrieb gil bl <gi...@yandex.com>:

> Hi Fabian,
> Thank you for your reply.
>
> I'm not sure my question was clear enough so I'll try to explain our
> scenario:
>
>    1. We are working in “event time” mode.
>    2. We want to handle ‘late data’ up to last X days (for example last 7
>    days)
>    3. For each incoming event:
>       1. The event is being aggregated using window function.
>       2. When the window if “fired”, the accumulated data is forwarded to
>       “sink” function and all data is being purged from the window.
>    4. If late data is arriving to the same windows, the same logic (as in
>    section 3) is being applied. When a window is fired the data is accumulated
>    from scratch, sent to a “sink” and purged from the window.
>    5. we are not using the default trigger.
>
> We expect the flow above to result in fragmented data, i.e. several
> outputs with the same <key, windows> which aggregate different sets of
> events.
>
> We encounter the following problem:
> Since we have a huge number of different <key, windows>, the metadata
> (WindowOperator, InternalTimer) is being kept in memory until the end of
> ‘allowed lateness’ period. This causes our job to run out of memory.
> Here is a calculation of the required memory consumption only for the
> window metadata -
> Metadata size for each <key, windows> is at least 64 bytes.
> If we have 200,000,000 <key, windows> per day and the allowed lateness is
> set to 7 days:
> 200,000,000 * 64 * 7 = ~83GB
>
> *For the scenario above the window metadata is useless*.
> Is there a possibility to *keep using window API*, *set allowed lateness*
> and *not keep the window metadata* until the end of allowed lateness
> period?
> (maybe as a new feature 😊?)
>
>
> 05.09.2019, 13:04, "Fabian Hueske" <fh...@gmail.com>:
>
> Hi,
>
> A window needs to keep the data as long as it expects new data.
> This is clearly the case before the end time of the window was reached. If
> my window ends at 12:30, I want to wait (at least) until 12:30 before I
> remove any data, right?
>
> In case you expect some data to be late, you can configure
> allowedLateness.
> Let's say, we configure allowedLateness of 10 minutes. In that case, Flink
> would keep the metadata of the window that closes at 12:30 until 12:40.
> The data is kept to be able to update the result of the window until
> allowedLateness has passed.
> If we for example receive a late record at 12:38, we can still update the
> result of the window because we kept all required data.
>
> If you don't need allowedLateness, don't configure it (the default is 0).
>
> Best, Fabian
>
> Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl <gi...@yandex.com>:
>
> Hi,
>
> I'm interested in why metadata like WindowOperator and InternalTimer are
> being kept for windowSize + allowedLateness period per each pane.
>
>    - What is the purpose of keeping this data if no new events are
>    expected to enter the pane?
>    - Is there any way this metadata can be released earlier?
>
>

Re: Window metadata removal

Posted by gil bl <gi...@yandex.com>.
Hi Fabian,

Thank you for your reply.

I'm not sure my question was clear enough so I'll try to explain our scenario:

  1. We are working in “event time” mode.
  2. We want to handle ‘late data’ up to last X days (for example last 7 days)
  3. For each incoming event:
    1. The event is being aggregated using window function.
    2. When the window if “fired”, the accumulated data is forwarded to “sink” function and all data is being purged from the window.
  4. If late data is arriving to the same windows, the same logic (as in section 3) is being applied. When a window is fired the data is accumulated from scratch, sent to a “sink” and purged from the window.
  5. we are not using the default trigger.

We expect the flow above to result in fragmented data, i.e. several outputs
with the same <key, windows> which aggregate different sets of events.

We encounter the following problem:

Since we have a huge number of different <key, windows>, the metadata
(WindowOperator, InternalTimer) is being kept in memory until the end of
‘allowed lateness’ period. This causes our job to run out of memory.

Here is a calculation of the required memory consumption only for the window
metadata -

Metadata size for each <key, windows> is at least 64 bytes.

If we have 200,000,000 <key, windows> per day and the allowed lateness is set
to 7 days:

200,000,000 * 64 * 7 = ~83GB

**For the scenario above the window metadata is useless**.

Is there a possibility to **keep using window API** , **set allowed lateness**
and **not keep the window metadata** until the end of allowed lateness period?

(maybe as a new feature 😊?)

  

  

05.09.2019, 13:04, "Fabian Hueske" <fh...@gmail.com>:

> Hi,

>

>  
>

>

> A window needs to keep the data as long as it expects new data.

>

> This is clearly the case before the end time of the window was reached. If
my window ends at 12:30, I want to wait (at least) until 12:30 before I remove
any data, right?

>

>  
>

>

> In case you expect some data to be late, you can configure allowedLateness.  
>

>

> Let's say, we configure allowedLateness of 10 minutes. In that case, Flink
would keep the metadata of the window that closes at 12:30 until 12:40.  
>

>

> The data is kept to be able to update the result of the window until
allowedLateness has passed.  
>

>

> If we for example receive a late record at 12:38, we can still update the
result of the window because we kept all required data.

>

>  
>

>

> If you don't need allowedLateness, don't configure it (the default is 0).

>

>  
>

>

> Best, Fabian  
>

>

>  
>

>

> Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl
<[gil.bl@yandex.com](mailto:gil.bl@yandex.com)>:  
>

>

>> Hi,

>>

>> I'm interested in why metadata like WindowOperator and InternalTimer are
being kept for windowSize + allowedLateness period per each pane.

>>

>>   * What is the purpose of keeping this data if no new events are expected
to enter the pane?

>>   * Is there any way this metadata can be released earlier?

>>


Re: Window metadata removal

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

A window needs to keep the data as long as it expects new data.
This is clearly the case before the end time of the window was reached. If
my window ends at 12:30, I want to wait (at least) until 12:30 before I
remove any data, right?

In case you expect some data to be late, you can configure allowedLateness.
Let's say, we configure allowedLateness of 10 minutes. In that case, Flink
would keep the metadata of the window that closes at 12:30 until 12:40.
The data is kept to be able to update the result of the window until
allowedLateness has passed.
If we for example receive a late record at 12:38, we can still update the
result of the window because we kept all required data.

If you don't need allowedLateness, don't configure it (the default is 0).

Best, Fabian

Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl <gi...@yandex.com>:

> Hi,
>
> I'm interested in why metadata like WindowOperator and InternalTimer are
> being kept for windowSize + allowedLateness period per each pane.
>
>    - What is the purpose of keeping this data if no new events are
>    expected to enter the pane?
>    - Is there any way this metadata can be released earlier?
>
>