You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by tao xiao <xi...@gmail.com> on 2021/09/13 13:17:45 UTC

RocksDB state not cleaned up

Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The
TTL update type is OnCreateAndWrite. We set the value state when the value
state doesn't exist and we never update it again after the state is not
empty. The key of the value state is timestamp. My understanding of such
TTL settings is that the size of all SST files remains flat (let's
disregard the impact space amplification brings) after 1 day as the daily
data volume is more or less the same. However the RocksDB native metrics
show that the SST files continue to grow since I started the job. I check
the SST files in local storage and I can see SST files with age 1 months
ago (when I started the job). What is the possible reason for the SST files
not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
    StateTtlConfig ttlConfigClick = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(300_000)
            .build();
    ValueStateDescriptor<Click> clickStateDescriptor = new
ValueStateDescriptor<>("click", Click.class);
    clickStateDescriptor.enableTimeToLive(ttlConfigClick);
    clickState = getRuntimeContext().getState(clickStateDescriptor);

    StateTtlConfig ttlConfigAds = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(30_000_000)
            .build();
    ValueStateDescriptor<A> adsStateDescriptor = new
ValueStateDescriptor<>("ads", slimAdsClass);
    adsStateDescriptor.enableTimeToLive(ttlConfigAds);
    adsState = getRuntimeContext().getState(adsStateDescriptor);
}

@Override
public void processElement(Tuple3<String, Click, A> tuple, Context
ctx, Collector<A> collector) throws Exception {
    if (tuple.f1 != null) {
        Click click = tuple.f1;

        if (clickState.value() != null) {
            return;
        }

        clickState.update(click);

        A adsFromState = adsState.value();
        if (adsFromState != null) {
            collector.collect(adsFromState);
        }
    } else {
        A ads = tuple.f2;

        if (adsState.value() != null) {
            return;
        }

        adsState.update(ads);

        Click clickFromState = clickState.value();
        if (clickFromState != null) {
            collector.collect(ads);
        }
    }
}


Here is the snippet of sst files in local storage

[root@xxxx db]# ll | head -n10
total 76040068
-rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
-rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
-rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
-rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
-rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
-rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
-rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
-rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
-rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
-- 
Regards,
Tao

Re: RocksDB state not cleaned up

Posted by Yun Tang <my...@live.com>.
Hi Alexis,

RocksDB itself supports manual compaction API [1], and current Flink does not support to call these APIs to support periodic compactions.

If Flink supports such period compaction, from my understanding, this is somehow like major compaction in HBase. I am not sure whether this is really useful for Flink as this could push data to the last level, which leads to increase the read amplification.

[1] https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/RocksDB.html

Best
Yun Tang
________________________________
From: Alexis Sarda-Espinosa <al...@microfocus.com>
Sent: Friday, April 8, 2022 18:54
To: tao xiao <xi...@gmail.com>; David Morávek <dm...@apache.org>
Cc: Yun Tang <my...@live.com>; user <us...@flink.apache.org>
Subject: RE: RocksDB state not cleaned up


May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And if yes, if it helped with this case.



Regards,

Alexis.



From: tao xiao <xi...@gmail.com>
Sent: Samstag, 18. September 2021 05:01
To: David Morávek <dm...@apache.org>
Cc: Yun Tang <my...@live.com>; user <us...@flink.apache.org>
Subject: Re: RocksDB state not cleaned up



Thanks for the feedback! However TTL already proves that the state cannot be cleaned up on time due to too many levels built up in RocksDB.



Hi @Yun Tang<ma...@live.com> do you have any suggestions to tune RocksDB to accelerate the compaction progress?



On Fri, Sep 17, 2021 at 8:01 PM David Morávek <dm...@apache.org>> wrote:

Cleaning up with timers should solve this. Both approaches have some advantages and disadvantages though.



Timers:

- No "side effects".

- Can be set in event time. Deletes are regular tombstones that will get compacted later on.



TTL:

- Performance. This costs literally nothing compared to an extra state for timer + writing a tombstone marker.

- Has "side-effects", because it works in processing time. This is just something to keep in mind eg. when bootstraping the state from historical data. (large event time / processing time skew)



With 1.14 release, we've bumped the RocksDB version so it may be possible to use a "periodic compaction" [1], but nobody has tried that so far. In the meantime I think there is non real workaround because we don't expose a way to trigger manual compaction.



I'm off to vacation until 27th and I won't be responsive during that time. I'd like to pull Yun into the conversation as he's super familiar with the RocksDB state backend.



[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction



Best,

D.



On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xi...@gmail.com>> wrote:

Hi David,



Confirmed with RocksDB log Stephan's observation is the root cause that compaction doesn't clean up the high level sst files fast enough.  Do you think manual clean up by registering a timer is the way to go or any RocksDB parameter can be tuned to mitigate this issue?



On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xi...@gmail.com>> wrote:

Hi David,



If I read Stephan's comment correctly TTL doesn't work well for cases where we have too many levels, like fast growing state,  as compaction doesn't clean up high level SST files in time, Is this correct? If yes should we register a timer with TTL time and manual clean up the state (state.clear() ) when the timer fires?



I will turn on RocksDB logging as well as compaction logging [1] to verify this



[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction





On Tue, Sep 14, 2021 at 5:38 PM David Morávek <dm...@apache.org>> wrote:

Hi Tao,



my intuition is that the compaction of SST files is not triggering. By default, it's only triggered by the size ratios of different levels [1] and the TTL mechanism has no effect on it.



Some reasoning from Stephan:



It's very likely to have large files in higher levels that haven't been compacted in a long time and thus just stay around.



This might be especially possible if you insert a lot in the beginning (build up many levels) and then have a moderate rate of modifications, so the changes and expiration keep happening purely in the merges / compactions of the first levels. Then the later levels may stay unchanged for quite some time.



You should be able to see compaction details by setting RocksDB logging to INFO [2]. Can you please check these and validate whether this really is the case?



[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction

[2] https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting



Best,

D.



On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com>> wrote:

Hi team



We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL update type is OnCreateAndWrite. We set the value state when the value state doesn't exist and we never update it again after the state is not empty. The key of the value state is timestamp. My understanding of such TTL settings is that the size of all SST files remains flat (let's disregard the impact space amplification brings) after 1 day as the daily data volume is more or less the same. However the RocksDB native metrics show that the SST files continue to grow since I started the job. I check the SST files in local storage and I can see SST files with age 1 months ago (when I started the job). What is the possible reason for the SST files not cleaned up?.



The Flink version is 1.12.1

State backend is RocksDB with incremental checkpoint

All default configuration for RocksDB

Per job mode in Yarn and checkpoint to S3





Here is the code to set value state

public void open(Configuration parameters) {
    StateTtlConfig ttlConfigClick = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(300_000)
            .build();
    ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
    clickStateDescriptor.enableTimeToLive(ttlConfigClick);
    clickState = getRuntimeContext().getState(clickStateDescriptor);

    StateTtlConfig ttlConfigAds = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(30_000_000)
            .build();
    ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
    adsStateDescriptor.enableTimeToLive(ttlConfigAds);
    adsState = getRuntimeContext().getState(adsStateDescriptor);
}

@Override
public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
    if (tuple.f1 != null) {
        Click click = tuple.f1;

        if (clickState.value() != null) {
            return;
        }

        clickState.update(click);

        A adsFromState = adsState.value();
        if (adsFromState != null) {
            collector.collect(adsFromState);
        }
    } else {
        A ads = tuple.f2;

        if (adsState.value() != null) {
            return;
        }

        adsState.update(ads);

        Click clickFromState = clickState.value();
        if (clickFromState != null) {
            collector.collect(ads);
        }
    }
}



Here is the snippet of sst files in local storage



[root@xxxx db]# ll | head -n10
total 76040068
-rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
-rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
-rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
-rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
-rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
-rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
-rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
-rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
-rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst

--

Regards,

Tao




--

Regards,

Tao




--

Regards,

Tao




--

Regards,

Tao

RE: RocksDB state not cleaned up

Posted by Alexis Sarda-Espinosa <al...@microfocus.com>.
May I ask if anyone tested RocksDB’s periodic compaction in the meantime? And if yes, if it helped with this case.

Regards,
Alexis.

From: tao xiao <xi...@gmail.com>
Sent: Samstag, 18. September 2021 05:01
To: David Morávek <dm...@apache.org>
Cc: Yun Tang <my...@live.com>; user <us...@flink.apache.org>
Subject: Re: RocksDB state not cleaned up

Thanks for the feedback! However TTL already proves that the state cannot be cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang<ma...@live.com> do you have any suggestions to tune RocksDB to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek <dm...@apache.org>> wrote:
Cleaning up with timers should solve this. Both approaches have some advantages and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just something to keep in mind eg. when bootstraping the state from historical data. (large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible to use a "periodic compaction" [1], but nobody has tried that so far. In the meantime I think there is non real workaround because we don't expose a way to trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time. I'd like to pull Yun into the conversation as he's super familiar with the RocksDB state backend.

[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xi...@gmail.com>> wrote:
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that compaction doesn't clean up the high level sst files fast enough.  Do you think manual clean up by registering a timer is the way to go or any RocksDB parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xi...@gmail.com>> wrote:
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where we have too many levels, like fast growing state,  as compaction doesn't clean up high level SST files in time, Is this correct? If yes should we register a timer with TTL time and manual clean up the state (state.clear() ) when the timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify this

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek <dm...@apache.org>> wrote:
Hi Tao,

my intuition is that the compaction of SST files is not triggering. By default, it's only triggered by the size ratios of different levels [1] and the TTL mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been compacted in a long time and thus just stay around.

This might be especially possible if you insert a lot in the beginning (build up many levels) and then have a moderate rate of modifications, so the changes and expiration keep happening purely in the merges / compactions of the first levels. Then the later levels may stay unchanged for quite some time.

You should be able to see compaction details by setting RocksDB logging to INFO [2]. Can you please check these and validate whether this really is the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2] https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com>> wrote:
Hi team

We have a job that uses value state with RocksDB and TTL set to 1 day. The TTL update type is OnCreateAndWrite. We set the value state when the value state doesn't exist and we never update it again after the state is not empty. The key of the value state is timestamp. My understanding of such TTL settings is that the size of all SST files remains flat (let's disregard the impact space amplification brings) after 1 day as the daily data volume is more or less the same. However the RocksDB native metrics show that the SST files continue to grow since I started the job. I check the SST files in local storage and I can see SST files with age 1 months ago (when I started the job). What is the possible reason for the SST files not cleaned up?.

The Flink version is 1.12.1
State backend is RocksDB with incremental checkpoint
All default configuration for RocksDB
Per job mode in Yarn and checkpoint to S3


Here is the code to set value state

public void open(Configuration parameters) {
    StateTtlConfig ttlConfigClick = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(300_000)
            .build();
    ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
    clickStateDescriptor.enableTimeToLive(ttlConfigClick);
    clickState = getRuntimeContext().getState(clickStateDescriptor);

    StateTtlConfig ttlConfigAds = StateTtlConfig
            .newBuilder(Time.days(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupInRocksdbCompactFilter(30_000_000)
            .build();
    ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
    adsStateDescriptor.enableTimeToLive(ttlConfigAds);
    adsState = getRuntimeContext().getState(adsStateDescriptor);
}

@Override
public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
    if (tuple.f1 != null) {
        Click click = tuple.f1;

        if (clickState.value() != null) {
            return;
        }

        clickState.update(click);

        A adsFromState = adsState.value();
        if (adsFromState != null) {
            collector.collect(adsFromState);
        }
    } else {
        A ads = tuple.f2;

        if (adsState.value() != null) {
            return;
        }

        adsState.update(ads);

        Click clickFromState = clickState.value();
        if (clickFromState != null) {
            collector.collect(ads);
        }
    }
}

Here is the snippet of sst files in local storage

[root@xxxx db]# ll | head -n10
total 76040068
-rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
-rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
-rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
-rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
-rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
-rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
-rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
-rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
-rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
--
Regards,
Tao


--
Regards,
Tao


--
Regards,
Tao


--
Regards,
Tao

Re: RocksDB state not cleaned up

Posted by tao xiao <xi...@gmail.com>.
Thanks for the feedback! However TTL already proves that the state cannot
be cleaned up on time due to too many levels built up in RocksDB.

Hi @Yun Tang <my...@live.com> do you have any suggestions to tune RocksDB
to accelerate the compaction progress?

On Fri, Sep 17, 2021 at 8:01 PM David Morávek <dm...@apache.org> wrote:

> Cleaning up with timers should solve this. Both approaches have some
> advantages and disadvantages though.
>
> Timers:
> - No "side effects".
> - Can be set in event time. Deletes are regular tombstones that will get
> compacted later on.
>
> TTL:
> - Performance. This costs literally nothing compared to an extra state for
> timer + writing a tombstone marker.
> - Has "side-effects", because it works in processing time. This is just
> something to keep in mind eg. when bootstraping the state from historical
> data. (large event time / processing time skew)
>
> With 1.14 release, we've bumped the RocksDB version so it may be possible
> to use a "periodic compaction" [1], but nobody has tried that so far. In
> the meantime I think there is non real workaround because we don't expose a
> way to trigger manual compaction.
>
> I'm off to vacation until 27th and I won't be responsive during that time.
> I'd like to pull Yun into the conversation as he's super familiar with the
> RocksDB state backend.
>
> [1]
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction
>
> Best,
> D.
>
> On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xi...@gmail.com> wrote:
>
>> Hi David,
>>
>> Confirmed with RocksDB log Stephan's observation is the root cause that
>> compaction doesn't clean up the high level sst files fast enough.  Do you
>> think manual clean up by registering a timer is the way to go or any
>> RocksDB parameter can be tuned to mitigate this issue?
>>
>> On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xi...@gmail.com> wrote:
>>
>>> Hi David,
>>>
>>> If I read Stephan's comment correctly TTL doesn't work well for cases
>>> where we have too many levels, like fast growing state,  as compaction
>>> doesn't clean up high level SST files in time, Is this correct? If yes
>>> should we register a timer with TTL time and manual clean up the state
>>> (state.clear() ) when the timer fires?
>>>
>>> I will turn on RocksDB logging as well as compaction logging [1] to
>>> verify this
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>>
>>>
>>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <dm...@apache.org> wrote:
>>>
>>>> Hi Tao,
>>>>
>>>> my intuition is that the compaction of SST files is not triggering. By
>>>> default, it's only triggered by the size ratios of different levels [1] and
>>>> the TTL mechanism has no effect on it.
>>>>
>>>> Some reasoning from Stephan:
>>>>
>>>> It's very likely to have large files in higher levels that haven't been
>>>>> compacted in a long time and thus just stay around.
>>>>>
>>>>> This might be especially possible if you insert a lot in the beginning
>>>>> (build up many levels) and then have a moderate rate of modifications, so
>>>>> the changes and expiration keep happening purely in the merges /
>>>>> compactions of the first levels. Then the later levels may stay unchanged
>>>>> for quite some time.
>>>>>
>>>>
>>>> You should be able to see compaction details by setting RocksDB logging
>>>> to INFO [2]. Can you please check these and validate whether this really is
>>>> the case?
>>>>
>>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>>>> [2]
>>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com> wrote:
>>>>
>>>>> Hi team
>>>>>
>>>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>>>> value state doesn't exist and we never update it again after the state is
>>>>> not empty. The key of the value state is timestamp. My understanding of
>>>>> such TTL settings is that the size of all SST files remains flat (let's
>>>>> disregard the impact space amplification brings) after 1 day as the daily
>>>>> data volume is more or less the same. However the RocksDB native metrics
>>>>> show that the SST files continue to grow since I started the job. I check
>>>>> the SST files in local storage and I can see SST files with age 1 months
>>>>> ago (when I started the job). What is the possible reason for the SST files
>>>>> not cleaned up?.
>>>>>
>>>>> The Flink version is 1.12.1
>>>>> State backend is RocksDB with incremental checkpoint
>>>>> All default configuration for RocksDB
>>>>> Per job mode in Yarn and checkpoint to S3
>>>>>
>>>>>
>>>>> Here is the code to set value state
>>>>>
>>>>> public void open(Configuration parameters) {
>>>>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>>>>             .newBuilder(Time.days(1))
>>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>>             .cleanupInRocksdbCompactFilter(300_000)
>>>>>             .build();
>>>>>     ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
>>>>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>>>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>>>
>>>>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>>>>             .newBuilder(Time.days(1))
>>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>>>>             .build();
>>>>>     ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
>>>>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>>>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
>>>>>     if (tuple.f1 != null) {
>>>>>         Click click = tuple.f1;
>>>>>
>>>>>         if (clickState.value() != null) {
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         clickState.update(click);
>>>>>
>>>>>         A adsFromState = adsState.value();
>>>>>         if (adsFromState != null) {
>>>>>             collector.collect(adsFromState);
>>>>>         }
>>>>>     } else {
>>>>>         A ads = tuple.f2;
>>>>>
>>>>>         if (adsState.value() != null) {
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         adsState.update(ads);
>>>>>
>>>>>         Click clickFromState = clickState.value();
>>>>>         if (clickFromState != null) {
>>>>>             collector.collect(ads);
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> Here is the snippet of sst files in local storage
>>>>>
>>>>> [root@xxxx db]# ll | head -n10
>>>>> total 76040068
>>>>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>>>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>>>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>>>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>>>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>>>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>>>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>>>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>>>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>>>>> --
>>>>> Regards,
>>>>> Tao
>>>>>
>>>>
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao

Re: RocksDB state not cleaned up

Posted by David Morávek <dm...@apache.org>.
Cleaning up with timers should solve this. Both approaches have some
advantages and disadvantages though.

Timers:
- No "side effects".
- Can be set in event time. Deletes are regular tombstones that will get
compacted later on.

TTL:
- Performance. This costs literally nothing compared to an extra state for
timer + writing a tombstone marker.
- Has "side-effects", because it works in processing time. This is just
something to keep in mind eg. when bootstraping the state from historical
data. (large event time / processing time skew)

With 1.14 release, we've bumped the RocksDB version so it may be possible
to use a "periodic compaction" [1], but nobody has tried that so far. In
the meantime I think there is non real workaround because we don't expose a
way to trigger manual compaction.

I'm off to vacation until 27th and I won't be responsive during that time.
I'd like to pull Yun into the conversation as he's super familiar with the
RocksDB state backend.

[1]
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#periodic-and-ttl-compaction

Best,
D.

On Fri, Sep 17, 2021 at 5:17 AM tao xiao <xi...@gmail.com> wrote:

> Hi David,
>
> Confirmed with RocksDB log Stephan's observation is the root cause that
> compaction doesn't clean up the high level sst files fast enough.  Do you
> think manual clean up by registering a timer is the way to go or any
> RocksDB parameter can be tuned to mitigate this issue?
>
> On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xi...@gmail.com> wrote:
>
>> Hi David,
>>
>> If I read Stephan's comment correctly TTL doesn't work well for cases
>> where we have too many levels, like fast growing state,  as compaction
>> doesn't clean up high level SST files in time, Is this correct? If yes
>> should we register a timer with TTL time and manual clean up the state
>> (state.clear() ) when the timer fires?
>>
>> I will turn on RocksDB logging as well as compaction logging [1] to
>> verify this
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>
>>
>> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <dm...@apache.org> wrote:
>>
>>> Hi Tao,
>>>
>>> my intuition is that the compaction of SST files is not triggering. By
>>> default, it's only triggered by the size ratios of different levels [1] and
>>> the TTL mechanism has no effect on it.
>>>
>>> Some reasoning from Stephan:
>>>
>>> It's very likely to have large files in higher levels that haven't been
>>>> compacted in a long time and thus just stay around.
>>>>
>>>> This might be especially possible if you insert a lot in the beginning
>>>> (build up many levels) and then have a moderate rate of modifications, so
>>>> the changes and expiration keep happening purely in the merges /
>>>> compactions of the first levels. Then the later levels may stay unchanged
>>>> for quite some time.
>>>>
>>>
>>> You should be able to see compaction details by setting RocksDB logging
>>> to INFO [2]. Can you please check these and validate whether this really is
>>> the case?
>>>
>>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>>> [2]
>>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com> wrote:
>>>
>>>> Hi team
>>>>
>>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>>> value state doesn't exist and we never update it again after the state is
>>>> not empty. The key of the value state is timestamp. My understanding of
>>>> such TTL settings is that the size of all SST files remains flat (let's
>>>> disregard the impact space amplification brings) after 1 day as the daily
>>>> data volume is more or less the same. However the RocksDB native metrics
>>>> show that the SST files continue to grow since I started the job. I check
>>>> the SST files in local storage and I can see SST files with age 1 months
>>>> ago (when I started the job). What is the possible reason for the SST files
>>>> not cleaned up?.
>>>>
>>>> The Flink version is 1.12.1
>>>> State backend is RocksDB with incremental checkpoint
>>>> All default configuration for RocksDB
>>>> Per job mode in Yarn and checkpoint to S3
>>>>
>>>>
>>>> Here is the code to set value state
>>>>
>>>> public void open(Configuration parameters) {
>>>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>>>             .newBuilder(Time.days(1))
>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>             .cleanupInRocksdbCompactFilter(300_000)
>>>>             .build();
>>>>     ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
>>>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>>
>>>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>>>             .newBuilder(Time.days(1))
>>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>>>             .build();
>>>>     ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
>>>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>>>> }
>>>>
>>>> @Override
>>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
>>>>     if (tuple.f1 != null) {
>>>>         Click click = tuple.f1;
>>>>
>>>>         if (clickState.value() != null) {
>>>>             return;
>>>>         }
>>>>
>>>>         clickState.update(click);
>>>>
>>>>         A adsFromState = adsState.value();
>>>>         if (adsFromState != null) {
>>>>             collector.collect(adsFromState);
>>>>         }
>>>>     } else {
>>>>         A ads = tuple.f2;
>>>>
>>>>         if (adsState.value() != null) {
>>>>             return;
>>>>         }
>>>>
>>>>         adsState.update(ads);
>>>>
>>>>         Click clickFromState = clickState.value();
>>>>         if (clickFromState != null) {
>>>>             collector.collect(ads);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>> Here is the snippet of sst files in local storage
>>>>
>>>> [root@xxxx db]# ll | head -n10
>>>> total 76040068
>>>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>>>> --
>>>> Regards,
>>>> Tao
>>>>
>>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>

Re: RocksDB state not cleaned up

Posted by tao xiao <xi...@gmail.com>.
Hi David,

Confirmed with RocksDB log Stephan's observation is the root cause that
compaction doesn't clean up the high level sst files fast enough.  Do you
think manual clean up by registering a timer is the way to go or any
RocksDB parameter can be tuned to mitigate this issue?

On Wed, Sep 15, 2021 at 12:10 AM tao xiao <xi...@gmail.com> wrote:

> Hi David,
>
> If I read Stephan's comment correctly TTL doesn't work well for cases
> where we have too many levels, like fast growing state,  as compaction
> doesn't clean up high level SST files in time, Is this correct? If yes
> should we register a timer with TTL time and manual clean up the state
> (state.clear() ) when the timer fires?
>
> I will turn on RocksDB logging as well as compaction logging [1] to verify
> this
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>
>
> On Tue, Sep 14, 2021 at 5:38 PM David Morávek <dm...@apache.org> wrote:
>
>> Hi Tao,
>>
>> my intuition is that the compaction of SST files is not triggering. By
>> default, it's only triggered by the size ratios of different levels [1] and
>> the TTL mechanism has no effect on it.
>>
>> Some reasoning from Stephan:
>>
>> It's very likely to have large files in higher levels that haven't been
>>> compacted in a long time and thus just stay around.
>>>
>>> This might be especially possible if you insert a lot in the beginning
>>> (build up many levels) and then have a moderate rate of modifications, so
>>> the changes and expiration keep happening purely in the merges /
>>> compactions of the first levels. Then the later levels may stay unchanged
>>> for quite some time.
>>>
>>
>> You should be able to see compaction details by setting RocksDB logging
>> to INFO [2]. Can you please check these and validate whether this really is
>> the case?
>>
>> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
>> [2]
>> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>>
>> Best,
>> D.
>>
>> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com> wrote:
>>
>>> Hi team
>>>
>>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>>> The TTL update type is OnCreateAndWrite. We set the value state when the
>>> value state doesn't exist and we never update it again after the state is
>>> not empty. The key of the value state is timestamp. My understanding of
>>> such TTL settings is that the size of all SST files remains flat (let's
>>> disregard the impact space amplification brings) after 1 day as the daily
>>> data volume is more or less the same. However the RocksDB native metrics
>>> show that the SST files continue to grow since I started the job. I check
>>> the SST files in local storage and I can see SST files with age 1 months
>>> ago (when I started the job). What is the possible reason for the SST files
>>> not cleaned up?.
>>>
>>> The Flink version is 1.12.1
>>> State backend is RocksDB with incremental checkpoint
>>> All default configuration for RocksDB
>>> Per job mode in Yarn and checkpoint to S3
>>>
>>>
>>> Here is the code to set value state
>>>
>>> public void open(Configuration parameters) {
>>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>>             .newBuilder(Time.days(1))
>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>             .cleanupInRocksdbCompactFilter(300_000)
>>>             .build();
>>>     ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
>>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>>
>>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>>             .newBuilder(Time.days(1))
>>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>>             .build();
>>>     ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
>>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>>> }
>>>
>>> @Override
>>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
>>>     if (tuple.f1 != null) {
>>>         Click click = tuple.f1;
>>>
>>>         if (clickState.value() != null) {
>>>             return;
>>>         }
>>>
>>>         clickState.update(click);
>>>
>>>         A adsFromState = adsState.value();
>>>         if (adsFromState != null) {
>>>             collector.collect(adsFromState);
>>>         }
>>>     } else {
>>>         A ads = tuple.f2;
>>>
>>>         if (adsState.value() != null) {
>>>             return;
>>>         }
>>>
>>>         adsState.update(ads);
>>>
>>>         Click clickFromState = clickState.value();
>>>         if (clickFromState != null) {
>>>             collector.collect(ads);
>>>         }
>>>     }
>>> }
>>>
>>>
>>> Here is the snippet of sst files in local storage
>>>
>>> [root@xxxx db]# ll | head -n10
>>> total 76040068
>>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>>> --
>>> Regards,
>>> Tao
>>>
>>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao

Re: RocksDB state not cleaned up

Posted by tao xiao <xi...@gmail.com>.
Hi David,

If I read Stephan's comment correctly TTL doesn't work well for cases where
we have too many levels, like fast growing state,  as compaction doesn't
clean up high level SST files in time, Is this correct? If yes should we
register a timer with TTL time and manual clean up the state (state.clear()
) when the timer fires?

I will turn on RocksDB logging as well as compaction logging [1] to verify
this

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/state.html#cleanup-during-rocksdb-compaction


On Tue, Sep 14, 2021 at 5:38 PM David Morávek <dm...@apache.org> wrote:

> Hi Tao,
>
> my intuition is that the compaction of SST files is not triggering. By
> default, it's only triggered by the size ratios of different levels [1] and
> the TTL mechanism has no effect on it.
>
> Some reasoning from Stephan:
>
> It's very likely to have large files in higher levels that haven't been
>> compacted in a long time and thus just stay around.
>>
>> This might be especially possible if you insert a lot in the beginning
>> (build up many levels) and then have a moderate rate of modifications, so
>> the changes and expiration keep happening purely in the merges /
>> compactions of the first levels. Then the later levels may stay unchanged
>> for quite some time.
>>
>
> You should be able to see compaction details by setting RocksDB logging to
> INFO [2]. Can you please check these and validate whether this really is
> the case?
>
> [1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
> [2]
> https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting
>
> Best,
> D.
>
> On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com> wrote:
>
>> Hi team
>>
>> We have a job that uses value state with RocksDB and TTL set to 1 day.
>> The TTL update type is OnCreateAndWrite. We set the value state when the
>> value state doesn't exist and we never update it again after the state is
>> not empty. The key of the value state is timestamp. My understanding of
>> such TTL settings is that the size of all SST files remains flat (let's
>> disregard the impact space amplification brings) after 1 day as the daily
>> data volume is more or less the same. However the RocksDB native metrics
>> show that the SST files continue to grow since I started the job. I check
>> the SST files in local storage and I can see SST files with age 1 months
>> ago (when I started the job). What is the possible reason for the SST files
>> not cleaned up?.
>>
>> The Flink version is 1.12.1
>> State backend is RocksDB with incremental checkpoint
>> All default configuration for RocksDB
>> Per job mode in Yarn and checkpoint to S3
>>
>>
>> Here is the code to set value state
>>
>> public void open(Configuration parameters) {
>>     StateTtlConfig ttlConfigClick = StateTtlConfig
>>             .newBuilder(Time.days(1))
>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>             .cleanupInRocksdbCompactFilter(300_000)
>>             .build();
>>     ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
>>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>>
>>     StateTtlConfig ttlConfigAds = StateTtlConfig
>>             .newBuilder(Time.days(1))
>>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>>             .cleanupInRocksdbCompactFilter(30_000_000)
>>             .build();
>>     ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
>>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>>     adsState = getRuntimeContext().getState(adsStateDescriptor);
>> }
>>
>> @Override
>> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
>>     if (tuple.f1 != null) {
>>         Click click = tuple.f1;
>>
>>         if (clickState.value() != null) {
>>             return;
>>         }
>>
>>         clickState.update(click);
>>
>>         A adsFromState = adsState.value();
>>         if (adsFromState != null) {
>>             collector.collect(adsFromState);
>>         }
>>     } else {
>>         A ads = tuple.f2;
>>
>>         if (adsState.value() != null) {
>>             return;
>>         }
>>
>>         adsState.update(ads);
>>
>>         Click clickFromState = clickState.value();
>>         if (clickFromState != null) {
>>             collector.collect(ads);
>>         }
>>     }
>> }
>>
>>
>> Here is the snippet of sst files in local storage
>>
>> [root@xxxx db]# ll | head -n10
>> total 76040068
>> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
>> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
>> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
>> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
>> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
>> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
>> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
>> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
>> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao

Re: RocksDB state not cleaned up

Posted by David Morávek <dm...@apache.org>.
Hi Tao,

my intuition is that the compaction of SST files is not triggering. By
default, it's only triggered by the size ratios of different levels [1] and
the TTL mechanism has no effect on it.

Some reasoning from Stephan:

It's very likely to have large files in higher levels that haven't been
> compacted in a long time and thus just stay around.
>
> This might be especially possible if you insert a lot in the beginning
> (build up many levels) and then have a moderate rate of modifications, so
> the changes and expiration keep happening purely in the merges /
> compactions of the first levels. Then the later levels may stay unchanged
> for quite some time.
>

You should be able to see compaction details by setting RocksDB logging to
INFO [2]. Can you please check these and validate whether this really is
the case?

[1] https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
[2]
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting

Best,
D.

On Mon, Sep 13, 2021 at 3:18 PM tao xiao <xi...@gmail.com> wrote:

> Hi team
>
> We have a job that uses value state with RocksDB and TTL set to 1 day. The
> TTL update type is OnCreateAndWrite. We set the value state when the value
> state doesn't exist and we never update it again after the state is not
> empty. The key of the value state is timestamp. My understanding of such
> TTL settings is that the size of all SST files remains flat (let's
> disregard the impact space amplification brings) after 1 day as the daily
> data volume is more or less the same. However the RocksDB native metrics
> show that the SST files continue to grow since I started the job. I check
> the SST files in local storage and I can see SST files with age 1 months
> ago (when I started the job). What is the possible reason for the SST files
> not cleaned up?.
>
> The Flink version is 1.12.1
> State backend is RocksDB with incremental checkpoint
> All default configuration for RocksDB
> Per job mode in Yarn and checkpoint to S3
>
>
> Here is the code to set value state
>
> public void open(Configuration parameters) {
>     StateTtlConfig ttlConfigClick = StateTtlConfig
>             .newBuilder(Time.days(1))
>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .cleanupInRocksdbCompactFilter(300_000)
>             .build();
>     ValueStateDescriptor<Click> clickStateDescriptor = new ValueStateDescriptor<>("click", Click.class);
>     clickStateDescriptor.enableTimeToLive(ttlConfigClick);
>     clickState = getRuntimeContext().getState(clickStateDescriptor);
>
>     StateTtlConfig ttlConfigAds = StateTtlConfig
>             .newBuilder(Time.days(1))
>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .cleanupInRocksdbCompactFilter(30_000_000)
>             .build();
>     ValueStateDescriptor<A> adsStateDescriptor = new ValueStateDescriptor<>("ads", slimAdsClass);
>     adsStateDescriptor.enableTimeToLive(ttlConfigAds);
>     adsState = getRuntimeContext().getState(adsStateDescriptor);
> }
>
> @Override
> public void processElement(Tuple3<String, Click, A> tuple, Context ctx, Collector<A> collector) throws Exception {
>     if (tuple.f1 != null) {
>         Click click = tuple.f1;
>
>         if (clickState.value() != null) {
>             return;
>         }
>
>         clickState.update(click);
>
>         A adsFromState = adsState.value();
>         if (adsFromState != null) {
>             collector.collect(adsFromState);
>         }
>     } else {
>         A ads = tuple.f2;
>
>         if (adsState.value() != null) {
>             return;
>         }
>
>         adsState.update(ads);
>
>         Click clickFromState = clickState.value();
>         if (clickFromState != null) {
>             collector.collect(ads);
>         }
>     }
> }
>
>
> Here is the snippet of sst files in local storage
>
> [root@xxxx db]# ll | head -n10
> total 76040068
> -rw-r----- 1 hadoop yarn        0 Aug 16 08:46 000003.log
> -rw-r----- 1 hadoop yarn 67700362 Aug 17 02:38 001763.sst
> -rw-r----- 1 hadoop yarn 67698753 Aug 17 02:38 001764.sst
> -rw-r----- 1 hadoop yarn 67699769 Aug 17 02:59 001790.sst
> -rw-r----- 1 hadoop yarn 67701239 Aug 17 04:58 002149.sst
> -rw-r----- 1 hadoop yarn 67700607 Aug 17 04:58 002150.sst
> -rw-r----- 1 hadoop yarn 67697524 Aug 17 04:59 002151.sst
> -rw-r----- 1 hadoop yarn 67700729 Aug 17 06:20 002373.sst
> -rw-r----- 1 hadoop yarn 67700296 Aug 17 06:20 002374.sst
> --
> Regards,
> Tao
>