You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Guowei Ma <gu...@gmail.com> on 2020/06/01 06:01:29 UTC

Re: Tumbling windows - increasing checkpoint size over time

Hi,
1. I am not the expert of Rocksdb. However, I think the state garbage
collection depends on the rocksdb compaction especially if the checkpoint
interval is 2s.  This is because the window element is still in the sst
file even if the window is triggerred.
2. Do you try the checkpoint interval 15s?  I guess it might reduce the
state size.
3. Would you like to share your rocksdb configuration? I think this could
help other state guys to know whether it is related to rocksdb or not.
Best,
Guowei


Wissman, Matt <ma...@here.com> 于2020年5月29日周五 下午10:30写道:

> Till,
>
>
>
> I’ll have to calculate the theoretical upper bound for our window state.
> Our data distribution and rate has a predictable pattern but the data rate
> pattern didn’t match the checkpoint size growth.
>
>
> [image: image.png]
>
>
>
> Here is a screenshot of the checkpoint size for the pipeline. The yellow
> section is when we had the checkpoint interval at 2 secs – the size seems
> to grow linearly and indefinitely. The blue, red and orange lines are in
> line with what I’d expect in terms of checkpoint size (100KB-2 MB).
>
>
>
> The incoming stream data for the whole time period is consistent (follows
> the same pattern).
>
>
>
> Changing the checkpoint interval seemed to fix the problem of the large
> and growing checkpoint size but I’m not sure why.
>
>
>
> Thanks!
>
>
>
> -Matt
>
>
>
> *From: *Till Rohrmann <tr...@apache.org>
> *Date: *Thursday, May 28, 2020 at 10:48 AM
> *To: *"Wissman, Matt" <ma...@here.com>
> *Cc: *Guowei Ma <gu...@gmail.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Tumbling windows - increasing checkpoint size over time
>
>
>
> Hi Matt,
>
>
>
> when using tumbling windows, then the checkpoint size is not only
> dependent on the number of keys (which is equivalent to the number of open
> windows) but also on how many events arrive for each open window because
> the windows store every window event in its state. Hence, it can be the
> case that you see different checkpoint sizes depending on the actual data
> distribution which can change over time. Have you checked whether the data
> distribution and rate is constant over time?
>
>
>
> What is the expected number of keys, size of events and number of events
> per key per second? Based on this information one could try to estimate an
> upper state size bound.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <ma...@here.com>
> wrote:
>
> Hello Till & Guowei,
>
>
>
> Thanks for the replies! Here is a snippet of the window function:
>
>
>
>   SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream
>
>                 .keyBy(idKeySelector())
>
>                 .window(TumblingProcessingTimeWindows.of(seconds(15)))
>
>                 .apply(new Aggregator())
>
>                 .name("Aggregator")
>
>                 .setParallelism(3);
>
>
>
> Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to
> 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint
> size growth)
>
> Lateness allowed: 0
>
> Watermarks: nothing is set in terms of watermarks – do they apply for
> Process Time?
>
> The set of keys processed in the stream is stable over time
>
>
>
> The checkpoint size actually looks pretty stable now that the interval was
> increased. Is it possible that the short checkpoint interval prevented
> compaction?
>
>
>
> Thanks!
>
>
>
> -Matt
>
>
>
>
>
> *From: *Till Rohrmann <tr...@apache.org>
> *Date: *Wednesday, May 27, 2020 at 9:00 AM
> *To: *Guowei Ma <gu...@gmail.com>
> *Cc: *"Wissman, Matt" <ma...@here.com>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Tumbling windows - increasing checkpoint size over time
>
>
>
> *LEARN FAST: This email originated outside of HERE.*
> Please do not click on links or open attachments unless you recognize the
> sender and know the content is safe. Thank you.
>
>
>
> Hi Matt,
>
>
>
> could you give us a bit more information about the windows you are using?
> They are tumbling windows. What's the size of the windows? Do you allow
> lateness of events? What's your checkpoint interval?
>
>
>
> Are you using event time? If yes, how is the watermark generated?
>
>
>
> You said that the number of events per window is more or less constant.
> Does this is also apply to the size of the individual events?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, May 27, 2020 at 1:21 AM Guowei Ma <gu...@gmail.com> wrote:
>
> Hi, Matt
> The total size of the state of the window operator is related to the
> number of windows. For example if you use keyby+tumblingwindow there
> would be keys number of windows.
> Hope this helps.
> Best,
> Guowei
>
> Wissman, Matt <ma...@here.com> 于2020年5月27日周三 上午3:35写道:
> >
> > Hello Flink Community,
> >
> >
> >
> > I’m running a Flink pipeline that uses a tumbling window and incremental
> checkpoint with RocksDB backed by s3. The number of objects in the window
> is stable but overtime the checkpoint size grows seemingly unbounded.
> Within the first few hours after bringing the Flink pipeline up, the
> checkpoint size is around 100K but after a week of operation it grows to
> around 100MB. The pipeline isn’t using any other Flink state besides the
> state that the window uses. I think this has something to do with RocksDB’s
> compaction but shouldn’t the tumbling window state expire and be purged
> from the checkpoint?
> >
> >
> >
> > Flink Version 1.7.1
> >
> >
> >
> > Thanks!
> >
> >
> >
> > -Matt
>
>

Re: Tumbling windows - increasing checkpoint size over time

Posted by "Wissman, Matt" <ma...@here.com>.
Guowei,

I had a different Flink app that was using 10 or15s intervals – it had a similar behavior but not nearly as bad as the 2s interval pipeline. Both have much have much longer checkpoint intervals now.

Here is the state config:

state.backend: rocksdb
    state.checkpoints.dir: {{ .Values.flink.checkpointUrl }}/checkpoints
    state.savepoints.dir: {{ .Values.flink.checkpointUrl }}/savepoints
    state.backend.incremental: true
    state.backend.rocksdb.localdir: /tmp/taskmanager

Thanks!

-Matt

From: Guowei Ma <gu...@gmail.com>
Date: Monday, June 1, 2020 at 1:01 AM
To: "Wissman, Matt" <ma...@here.com>
Cc: Till Rohrmann <tr...@apache.org>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi,
1. I am not the expert of Rocksdb. However, I think the state garbage collection depends on the rocksdb compaction especially if the checkpoint interval is 2s.  This is because the window element is still in the sst file even if the window is triggerred.
2. Do you try the checkpoint interval 15s?  I guess it might reduce the state size.
3. Would you like to share your rocksdb configuration? I think this could help other state guys to know whether it is related to rocksdb or not.
Best,
Guowei


Wissman, Matt <ma...@here.com>> 于2020年5月29日周五 下午10:30写道:
Till,

I’ll have to calculate the theoretical upper bound for our window state. Our data distribution and rate has a predictable pattern but the data rate pattern didn’t match the checkpoint size growth.

[cid:image001.png@01D63B59.521B7E70]


Here is a screenshot of the checkpoint size for the pipeline. The yellow section is when we had the checkpoint interval at 2 secs – the size seems to grow linearly and indefinitely. The blue, red and orange lines are in line with what I’d expect in terms of checkpoint size (100KB-2 MB).

The incoming stream data for the whole time period is consistent (follows the same pattern).

Changing the checkpoint interval seemed to fix the problem of the large and growing checkpoint size but I’m not sure why.

Thanks!

-Matt

From: Till Rohrmann <tr...@apache.org>>
Date: Thursday, May 28, 2020 at 10:48 AM
To: "Wissman, Matt" <ma...@here.com>>
Cc: Guowei Ma <gu...@gmail.com>>, "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent on the number of keys (which is equivalent to the number of open windows) but also on how many events arrive for each open window because the windows store every window event in its state. Hence, it can be the case that you see different checkpoint sizes depending on the actual data distribution which can change over time. Have you checked whether the data distribution and rate is constant over time?

What is the expected number of keys, size of events and number of events per key per second? Based on this information one could try to estimate an upper state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt <ma...@here.com>> wrote:

Hello Till & Guowei,



Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was increased. Is it possible that the short checkpoint interval prevented compaction?

Thanks!

-Matt


From: Till Rohrmann <tr...@apache.org>>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <gu...@gmail.com>>
Cc: "Wissman, Matt" <ma...@here.com>>, "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They are tumbling windows. What's the size of the windows? Do you allow lateness of events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma <gu...@gmail.com>> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <ma...@here.com>> 于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental checkpoint with RocksDB backed by s3. The number of objects in the window is stable but overtime the checkpoint size grows seemingly unbounded. Within the first few hours after bringing the Flink pipeline up, the checkpoint size is around 100K but after a week of operation it grows to around 100MB. The pipeline isn’t using any other Flink state besides the state that the window uses. I think this has something to do with RocksDB’s compaction but shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt