You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ken Krugler (Jira)" <ji...@apache.org> on 2019/09/24 17:12:00 UTC
[jira] [Commented] (FLINK-14197) Increasing trend for state size of
keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
[ https://issues.apache.org/jira/browse/FLINK-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16937009#comment-16937009 ]
Ken Krugler commented on FLINK-14197:
-------------------------------------
On the Flink user mailing list, [this thread|http://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3c58C0A1C0-8715-4614-AE45-D96AFBCE3461@mediamath.com%3e] seems relevant.
Also, I didn't see this question on the mailing list. That's where you typically want to start, and only file a bug report in Jira once committers have provided some input on whether your issue is in fact a bug, and if so, the root cause, thanks.
> Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-14197
> URL: https://issues.apache.org/jira/browse/FLINK-14197
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.9.0
> Environment: Tested with:
> * Local Flink Mini Cluster running from IDE
> * Flink standalone cluster run in docker
> Reporter: Oliver Kostera
> Priority: Major
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following definition:
> {code:java}
> final SingleOutputStreamOperator<Message> processWindowFunctionStream =
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
> .process(new CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
> .name("Process window function");
> {code}
> My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode.
> In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so.
> Testing conditions:
> - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
> - RabbitMQ sink to collect messages
> - Simple ProcessWindowFunction that only pass messages through
> - Stream time characteristic set to TimeCharacteristic.ProcessingTime
> Testing scenario:
> - Start flink job and check initial state size - State Size: 127 KB
> - Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window)
> - State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb
> - Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by
> - Was expected to see state size to decrease to base value but it stayed at 2mb
> - Continue to send messages with the same keys and state kept increasing trend.
> What I checked:
> - Registration and deregistration of timestamps set for time windows - each registration matched its deregistration
> - Checked that in fact there are no window merges
> - Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
> On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys
> Flink commit id: 9c32ed9
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)