You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Almeida, Julius" <Ju...@intuit.com> on 2021/03/25 00:13:31 UTC

State size increasing exponentially in Flink v1.9

Hey,
Hope you all are doing well!

I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.

I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.

I also have logic added to remove expired events form the MapState
Eg.: MapState.remove(key)

Can anyone give me pointers to find more details on it.

Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811

Thanks,
Julius

Re: State size increasing exponentially in Flink v1.9

Posted by Yun Tang <my...@live.com>.
Hi Julius,

It seems you have customizer wrapper like `StateSpec` and `MapState` which are not included in Flink runtime code. I cannot judge whether your usage is correct since I don't know what you have done in your framework.

If you really use TTL with RocksDB state backend, you could just set TTL configuration as official doc said [1][2] instead of your current strange check and remove usage. Cleaning in background should give much better performance.

BTW, if the `MapState` you used in code was just the Flink's MapState [3], the correct way should be MapState<String, Object> instead of the strange nested format of MapState<String, HashMap<String, Object>.

Moreover, it's better to give explicit type of map state value instead of just `Object`.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#state-time-to-live-ttl
[3] https://github.com/apache/flink/blob/release-1.9/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java


Best
Yun Tang
________________________________
From: Almeida, Julius <Ju...@intuit.com>
Sent: Sunday, March 28, 2021 23:36
To: Yun Tang <my...@live.com>; user <us...@flink.apache.org>
Cc: Chesnay Schepler <ch...@apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9


Hi Yun,



Yes I see below mentioned log:



[cid:image001.png@01D723AD.76DEDB70]



@StateId("map1")
private final StateSpec<MapState<String, HashMap<String, Object>>> map1 = StateSpecs.map();

@StateId("map2")
private final StateSpec<MapState<String, HashMap<String, HashMap<String, Object >>>> map2 = StateSpecs.map();

@StateId("is_state_expiry_timer_set")
private final StateSpec<ValueState<Boolean>> isStateExpiryTimerSet = StateSpecs.value();



@TimerId("state_expiry")
private final TimerSpec stateExpiry = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);



  1.  I remove values from map which are expired, we check for expired values every 2hrs
  2.  Later I do call isStateExpiryTimerSet.clear();



The state size keeps on growing exponentially, as we have set checkpoint time to 10mins.

If I am missing something, can you share some example to setup TTL, I believe the logic to clean expired records seems correct, just want to know if I am missing any addition components.



Thanks,

Julius



From: Yun Tang <my...@live.com>
Date: Sunday, March 28, 2021 at 3:49 AM
To: "Almeida, Julius" <Ju...@intuit.com>, user <us...@flink.apache.org>
Cc: Chesnay Schepler <ch...@apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



This email is from an external sender.



Hi Julius



You could check whether this log "Successfully loaded RocksDB native library" [1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.



BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without any details.



[1] https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L910



Best

Yun Tang

________________________________

From: Almeida, Julius <Ju...@intuit.com>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <my...@live.com>; Chesnay Schepler <ch...@apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



Hi Yun,



Thanks for response.



I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.



[Graphical user interface, text, application, Teams  Description automatically generated]

[Graphical user interface, text, application, Teams  Description automatically generated]



We can move towards slack for better communication if required any more details. Appreciate your help.?



Thanks,

Julius

From: Yun Tang <my...@live.com>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <ch...@apache.org>, "Almeida, Julius" <Ju...@intuit.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



This email is from an external sender.



Hi,



If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?



CopyOnWriteStateMap should only exist in heap based state-backend.



Best

Yun Tang



________________________________

From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <Ju...@intuit.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?



On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!



I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.



I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.



I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)



Can anyone give me pointers to find more details on it.



Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811



Thanks,

Julius



Re: State size increasing exponentially in Flink v1.9

Posted by "Almeida, Julius" <Ju...@intuit.com>.
Hi Yun,

Yes I see below mentioned log:

[cid:image001.png@01D723AD.76DEDB70]

@StateId("map1")
private final StateSpec<MapState<String, HashMap<String, Object>>> map1 = StateSpecs.map();

@StateId("map2")
private final StateSpec<MapState<String, HashMap<String, HashMap<String, Object >>>> map2 = StateSpecs.map();

@StateId("is_state_expiry_timer_set")
private final StateSpec<ValueState<Boolean>> isStateExpiryTimerSet = StateSpecs.value();


@TimerId("state_expiry")
private final TimerSpec stateExpiry = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);


  1.  I remove values from map which are expired, we check for expired values every 2hrs
  2.  Later I do call isStateExpiryTimerSet.clear();

The state size keeps on growing exponentially, as we have set checkpoint time to 10mins.
If I am missing something, can you share some example to setup TTL, I believe the logic to clean expired records seems correct, just want to know if I am missing any addition components.

Thanks,
Julius

From: Yun Tang <my...@live.com>
Date: Sunday, March 28, 2021 at 3:49 AM
To: "Almeida, Julius" <Ju...@intuit.com>, user <us...@flink.apache.org>
Cc: Chesnay Schepler <ch...@apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9

This email is from an external sender.

Hi Julius

You could check whether this log "Successfully loaded RocksDB native library" [1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.

BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without any details.

[1] https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L910

Best
Yun Tang
________________________________
From: Almeida, Julius <Ju...@intuit.com>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <my...@live.com>; Chesnay Schepler <ch...@apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9


Hi Yun,



Thanks for response.



I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.



[Graphical user interface, text, application, Teams  Description automatically generated]

[Graphical user interface, text, application, Teams  Description automatically generated]



We can move towards slack for better communication if required any more details. Appreciate your help.🙂



Thanks,

Julius

From: Yun Tang <my...@live.com>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <ch...@apache.org>, "Almeida, Julius" <Ju...@intuit.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



This email is from an external sender.



Hi,



If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?



CopyOnWriteStateMap should only exist in heap based state-backend.



Best

Yun Tang



________________________________

From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <Ju...@intuit.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?



On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!



I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.



I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.



I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)



Can anyone give me pointers to find more details on it.



Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811



Thanks,

Julius



Re: State size increasing exponentially in Flink v1.9

Posted by Yun Tang <my...@live.com>.
Hi Julius

You could check whether this log "Successfully loaded RocksDB native library" [1] printed on task managers to check whether you actually let RocksDB state backend take effect. If using RocksDB state-backend, Flink would not use heap keyed state-backend anymore, in which `CopyOnWriteStateMap` should not exist.

BTW, you actually provided too few information. How many states do you use, how is the checkpointed size growing and have you ever set up TTL? It's not easy to help answer a question without any details.

[1] https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L910

Best
Yun Tang
________________________________
From: Almeida, Julius <Ju...@intuit.com>
Sent: Saturday, March 27, 2021 0:40
To: Yun Tang <my...@live.com>; Chesnay Schepler <ch...@apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9


Hi Yun,



Thanks for response.



I am using beam with flinkv1.9 runner. ` CopyOnWriteStateMap` shows up in heap dump which say duplicate values, I am still using rocksdb state backend.



[Graphical user interface, text, application, Teams  Description automatically generated]

[Graphical user interface, text, application, Teams  Description automatically generated]



We can move towards slack for better communication if required any more details. Appreciate your help.?



Thanks,

Julius

From: Yun Tang <my...@live.com>
Date: Friday, March 26, 2021 at 6:04 AM
To: Chesnay Schepler <ch...@apache.org>, "Almeida, Julius" <Ju...@intuit.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



This email is from an external sender.



Hi,



If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?



CopyOnWriteStateMap should only exist in heap based state-backend.



Best

Yun Tang



________________________________

From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <Ju...@intuit.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9



Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?



On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!



I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.



I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.



I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)



Can anyone give me pointers to find more details on it.



Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811



Thanks,

Julius



Re: State size increasing exponentially in Flink v1.9

Posted by Yun Tang <my...@live.com>.
Hi,

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

CopyOnWriteStateMap should only exist in heap based state-backend.

Best
Yun Tang

________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius <Ju...@intuit.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: State size increasing exponentially in Flink v1.9

Could you show us how you interact with the map state (ideally the full code of your function that accesses the state)?

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!



I am using flink v1.9 with RocksDBStateBackend, but over time the state size is increasing exponentially.



I am using MapState in my project & seeing memory spike, after looking at heap dump I see duplicates in it.



I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)



Can anyone give me pointers to find more details on it.



Heap Dump pointed to https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811



Thanks,

Julius


Re: State size increasing exponentially in Flink v1.9

Posted by Chesnay Schepler <ch...@apache.org>.
Could you show us how you interact with the map state (ideally the full 
code of your function that accesses the state)?

On 3/25/2021 1:13 AM, Almeida, Julius wrote:
>
> Hey,
>
> Hope you all are doing well!
>
> I am using flink v1.9 with RocksDBStateBackend, but over time the 
> state size is increasing exponentially.
>
> I am using MapState in my project & seeing memory spike, after looking 
> at heap dump I see duplicates in it.
>
> I also have logic added to remove expired events form the MapState
>
> Eg.: MapState.remove(key)
>
> Can anyone give me pointers to find more details on it.
>
> Heap Dump pointed to 
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811 
> <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811>
>
> Thanks,
>
> Julius
>