You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by LakeShen <sh...@gmail.com> on 2020/03/17 07:30:26 UTC

Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();



> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.


What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Lake,

When the Flink doc mentions a state entry in RocksDB, we mean one key/value
pair stored by user code over any keyed state API
(keyed context in keyed operators obtained e.g. from keyBy()
transformation).
In case of map or list, the doc means map key/value and list element.

- value/aggregating/folding/reducing state: key -> value
- map state: key -> map key -> value
- list state: key -> list -> element in some position

Best,
Andrey

On Tue, Mar 17, 2020 at 11:04 AM Yun Tang <my...@live.com> wrote:

> Hi Lake
>
> Flink leverage RocksDB's background compaction mechanism to filter
> out-of-TTL entries (by comparing with current timestamp provided from
> RocksDB's time_provider) to not let them stay in newly compacted data.
>
> This would iterator over data entries with FlinkCompactionFilter::FilterV2
> [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the
> threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB
> iterator more than several entries .e.g 1000, it would call time_provider
> to update current timestamp to let the process of cleaning up more eagerly
> and accurately.
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
> [2]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* LakeShen <sh...@gmail.com>
> *Sent:* Tuesday, March 17, 2020 15:30
> *To:* dev <de...@flink.apache.org>; user-zh <us...@flink.apache.org>;
> user <us...@flink.apache.org>
> *Subject:* Question about RocksDBStateBackend Compaction Filter state
> cleanup
>
> Hi community ,
>
> I see the flink RocksDBStateBackend state cleanup,now the code like this :
>
> StateTtlConfig ttlConfig = StateTtlConfig
>     .newBuilder(Time.seconds(1))
>     .cleanupInRocksdbCompactFilter(1000)
>     .build();
>
>
>
> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.
>
>
> What's the meaning of  1000 entries? 1000 different key ?
>
> Thanks to your reply.
>
> Best regards,
> LakeShen
>

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Lake,

When the Flink doc mentions a state entry in RocksDB, we mean one key/value
pair stored by user code over any keyed state API
(keyed context in keyed operators obtained e.g. from keyBy()
transformation).
In case of map or list, the doc means map key/value and list element.

- value/aggregating/folding/reducing state: key -> value
- map state: key -> map key -> value
- list state: key -> list -> element in some position

Best,
Andrey

On Tue, Mar 17, 2020 at 11:04 AM Yun Tang <my...@live.com> wrote:

> Hi Lake
>
> Flink leverage RocksDB's background compaction mechanism to filter
> out-of-TTL entries (by comparing with current timestamp provided from
> RocksDB's time_provider) to not let them stay in newly compacted data.
>
> This would iterator over data entries with FlinkCompactionFilter::FilterV2
> [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the
> threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB
> iterator more than several entries .e.g 1000, it would call time_provider
> to update current timestamp to let the process of cleaning up more eagerly
> and accurately.
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
> [2]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* LakeShen <sh...@gmail.com>
> *Sent:* Tuesday, March 17, 2020 15:30
> *To:* dev <de...@flink.apache.org>; user-zh <us...@flink.apache.org>;
> user <us...@flink.apache.org>
> *Subject:* Question about RocksDBStateBackend Compaction Filter state
> cleanup
>
> Hi community ,
>
> I see the flink RocksDBStateBackend state cleanup,now the code like this :
>
> StateTtlConfig ttlConfig = StateTtlConfig
>     .newBuilder(Time.seconds(1))
>     .cleanupInRocksdbCompactFilter(1000)
>     .build();
>
>
>
> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.
>
>
> What's the meaning of  1000 entries? 1000 different key ?
>
> Thanks to your reply.
>
> Best regards,
> LakeShen
>

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Lake,

When the Flink doc mentions a state entry in RocksDB, we mean one key/value
pair stored by user code over any keyed state API
(keyed context in keyed operators obtained e.g. from keyBy()
transformation).
In case of map or list, the doc means map key/value and list element.

- value/aggregating/folding/reducing state: key -> value
- map state: key -> map key -> value
- list state: key -> list -> element in some position

Best,
Andrey

On Tue, Mar 17, 2020 at 11:04 AM Yun Tang <my...@live.com> wrote:

> Hi Lake
>
> Flink leverage RocksDB's background compaction mechanism to filter
> out-of-TTL entries (by comparing with current timestamp provided from
> RocksDB's time_provider) to not let them stay in newly compacted data.
>
> This would iterator over data entries with FlinkCompactionFilter::FilterV2
> [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the
> threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB
> iterator more than several entries .e.g 1000, it would call time_provider
> to update current timestamp to let the process of cleaning up more eagerly
> and accurately.
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
> [2]
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* LakeShen <sh...@gmail.com>
> *Sent:* Tuesday, March 17, 2020 15:30
> *To:* dev <de...@flink.apache.org>; user-zh <us...@flink.apache.org>;
> user <us...@flink.apache.org>
> *Subject:* Question about RocksDBStateBackend Compaction Filter state
> cleanup
>
> Hi community ,
>
> I see the flink RocksDBStateBackend state cleanup,now the code like this :
>
> StateTtlConfig ttlConfig = StateTtlConfig
>     .newBuilder(Time.seconds(1))
>     .cleanupInRocksdbCompactFilter(1000)
>     .build();
>
>
>
> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.
>
>
> What's the meaning of  1000 entries? 1000 different key ?
>
> Thanks to your reply.
>
> Best regards,
> LakeShen
>

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Posted by Yun Tang <my...@live.com>.
Hi Lake

Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL entries (by comparing with current timestamp provided from RocksDB's time_provider) to not let them stay in newly compacted data.

This would iterator over data entries with FlinkCompactionFilter::FilterV2 [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB iterator more than several entries .e.g 1000, it would call time_provider to update current timestamp to let the process of cleaning up more eagerly and accurately.

[1] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
[2] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140

Best
Yun Tang

________________________________
From: LakeShen <sh...@gmail.com>
Sent: Tuesday, March 17, 2020 15:30
To: dev <de...@flink.apache.org>; user-zh <us...@flink.apache.org>; user <us...@flink.apache.org>
Subject: Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :


StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();


The default background cleanup for RocksDB backend queries the current timestamp each time 1000 entries have been processed.

What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Posted by Yun Tang <my...@live.com>.
Hi Lake

Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL entries (by comparing with current timestamp provided from RocksDB's time_provider) to not let them stay in newly compacted data.

This would iterator over data entries with FlinkCompactionFilter::FilterV2 [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB iterator more than several entries .e.g 1000, it would call time_provider to update current timestamp to let the process of cleaning up more eagerly and accurately.

[1] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
[2] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140

Best
Yun Tang

________________________________
From: LakeShen <sh...@gmail.com>
Sent: Tuesday, March 17, 2020 15:30
To: dev <de...@flink.apache.org>; user-zh <us...@flink.apache.org>; user <us...@flink.apache.org>
Subject: Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :


StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();


The default background cleanup for RocksDB backend queries the current timestamp each time 1000 entries have been processed.

What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

Posted by Yun Tang <my...@live.com>.
Hi Lake

Flink leverage RocksDB's background compaction mechanism to filter out-of-TTL entries (by comparing with current timestamp provided from RocksDB's time_provider) to not let them stay in newly compacted data.

This would iterator over data entries with FlinkCompactionFilter::FilterV2 [1], and the parameter 'queryTimeAfterNumEntries' in Flink indicates the threshold 'query_time_after_num_entries_' in FrocksDB  [2]. Once RocksDB iterator more than several entries .e.g 1000, it would call time_provider to update current timestamp to let the process of cleaning up more eagerly and accurately.

[1] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.cc#L107
[2] https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/utilities/flink/flink_compaction_filter.h#L140

Best
Yun Tang

________________________________
From: LakeShen <sh...@gmail.com>
Sent: Tuesday, March 17, 2020 15:30
To: dev <de...@flink.apache.org>; user-zh <us...@flink.apache.org>; user <us...@flink.apache.org>
Subject: Question about RocksDBStateBackend Compaction Filter state cleanup

Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :


StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();


The default background cleanup for RocksDB backend queries the current timestamp each time 1000 entries have been processed.

What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen