You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "karl.pullicino" <ka...@gamesysgroup.com> on 2020/03/03 18:10:39 UTC

Flink Session Windows State TTL

Hi all,We have an Apache Flink application which generates player sessions
based on player events keyed by playerId. Sessions are based on EventTime. A
session is created on first event event for that player and closes if there
are 30 mins of inactivity. Events are merged in our custom
/PlayerSessionAggregator implements AggregateFunction/. We deployed this
application on a Flink dev cluster (having checkpoints enabled), however we
noted that the state keeps growing until we end up with an out of memory as
shown in the attached file /flink_oom_exception.txt/We tried the using the
/PurgingTrigger/ together /CountTrigger/ however since it uses
/FIRE_AND_PURGE/ we were ending up with a session per event i.e. event were
not being merged.Using an /Evictor/ we ended up with same situation because
events were being removed from the window. Hence we resorted to using State
TTL:
 
We created a /StateTtlConfig/ having an expiry of 120 minutes to
periodically remove expired sessions from the state. 
This /stateTtlConfig/ is passed to the flatMap /PlayerSessionEventMapper
extends RichFlatMapFunction/. 
 The /PlayerSessionEventMapper/ has a /ValueStateDescriptor/ to provide
access to state per player. This /ValueStateDescriptor/ uses the previously
mentioned /stateTtlConfig/ 
 The state per player is updated on each player event. Also we enforce a
state access (using / ValueState.value()/) since as per documentation
"expired values are only removed when they are read out explicitly, e.g. by
calling ValueState.value()" 
This idea was based on the examples as provided in:
 https://flink.apache.org/2019/05/19/state-ttl.html 

https://www.ververica.com/blog/state-ttl-for-apache-flink-how-to-limit-the-lifetime-of-state 

https://www.slideshare.net/FlinkForward/time-tolive-how-to-perform-automatic-state-cleanup-in-apache-flink-andrey-zagrebin-ververica 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively 
*Code: * PlayerSessionApp.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/PlayerSessionApp.java>  
PlayerSessionEventMapper.java
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/PlayerSessionEventMapper.java> 
(Some custom classes have been removed for simplicity reasons)

* Our questions are: *
are expired session windows automatically removed from state? if not, what's
the best way to do it? 
how can we query state size? 
how can we query number of windows in state? 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Session Windows State TTL

Posted by Robert Metzger <rm...@apache.org>.
Sorry, I pressed the send button too fast.

You also attached an exception to the email, which reads as follows:

Caused by: org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException:
Could not fulfill slot request c28970b7cd4f68383e242703bdac81ca.
Requested resource profile (ResourceProfile{cpuCores=-1.0,
heapMemoryInMB=-1, directMemoryInMB=-1, nativeMemoryInMB=-1,
networkMemoryInMB=-1, managedMemoryInMB=-1}) is unfulfillable.

This exception does not indicate that you are running out of memory at
runtime, rather that your operator can not be scheduled anymore.
Can you see if enabling the operator chaining again solves the problem?

Also, how are you deploying Flink?


On Mon, Mar 9, 2020 at 3:30 PM Robert Metzger <rm...@apache.org> wrote:

> Hey Karl,
>
> sorry for the late reply!
>
> Let me first quickly answer your questions:
>
>>
>>    - are expired session windows automatically removed from state? if
>>    not, what's the best way to do it?
>>
>> Yes, they should get removed automatically.
>
>>
>>    -
>>    - how can we query state size?
>>
>>
> You can monitor the state size in the Flink web ui (there's a
> "Checkpointing" tab for each job)
> Or through Flink's metrics system:
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#checkpointing
>
>
>>
>>    -
>>    - how can we query number of windows in state?
>>
>> The window operator does not expose any metrics.
>
> I also have some questions :)
> - Have you considered using the RocksDB statebackend to mitigate the out
> of memory issues?
> - Why are you disabling the operator chaining?
> - Did you validate that the "TimeZone.setDefault(...)" setting ends up at
> the worker JVMs executing your code? (I suspect that you are only setting
> the TimeZone in the JVM executing the main() method)
>
> Best,
> Robert
>
>
>
> On Tue, Mar 3, 2020 at 7:23 PM karl.pullicino <
> karl.pullicino@gamesysgroup.com> wrote:
>
>> Added  flink_oom_exception.txt
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt>
>>
>> as originally forgot to attach it
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Flink Session Windows State TTL

Posted by Robert Metzger <rm...@apache.org>.
Hey Karl,

sorry for the late reply!

Let me first quickly answer your questions:

>
>    - are expired session windows automatically removed from state? if
>    not, what's the best way to do it?
>
> Yes, they should get removed automatically.

>
>    -
>    - how can we query state size?
>
>
You can monitor the state size in the Flink web ui (there's a
"Checkpointing" tab for each job)
Or through Flink's metrics system:
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#checkpointing


>
>    -
>    - how can we query number of windows in state?
>
> The window operator does not expose any metrics.

I also have some questions :)
- Have you considered using the RocksDB statebackend to mitigate the out of
memory issues?
- Why are you disabling the operator chaining?
- Did you validate that the "TimeZone.setDefault(...)" setting ends up at
the worker JVMs executing your code? (I suspect that you are only setting
the TimeZone in the JVM executing the main() method)

Best,
Robert



On Tue, Mar 3, 2020 at 7:23 PM karl.pullicino <
karl.pullicino@gamesysgroup.com> wrote:

> Added  flink_oom_exception.txt
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt>
>
> as originally forgot to attach it
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Flink Session Windows State TTL

Posted by "karl.pullicino" <ka...@gamesysgroup.com>.
Added  flink_oom_exception.txt
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2426/flink_oom_exception.txt>  
as originally forgot to attach it



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/