You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Congxian Qiu <qc...@gmail.com> on 2019/10/01 05:48:19 UTC

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

Hi Oliwer,

From the description, Seems the state didn't be cleared, maybe you could
check how many {{windowState.clear()}} was triggered in
{{WindowOperator#processElement}}, and try to figure it out why the state
did not be cleared.

Best,
Congxian


Oliwer Kostera <O....@adbglobal.com> 于2019年9月27日周五 下午4:14写道:

> Hi all,
>
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following
> definition:
>
> final SingleOutputStreamOperator<Message> processWindowFunctionStream =
>      keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>                 .process(new CustomProcessWindowFunction())
>                 .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>                 .name("Process window function");
>
> My checkpointing configuration is set to use RocksDB state backend with
> incremental checkpointing and EXACTLY_ONCE mode.
>
> In a runtime I noticed that even though data ingestion is static - same
> keys and frequency of messages the size of the process window operator
> keeps increasing. I tried to reproduce it with minimal similar setup here:
>  https://github.com/loliver1234/flink-process-window-function and was
> successful to do so.
>
> Testing conditions:
>
>    - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>    - RabbitMQ sink to collect messages
>    - Simple ProcessWindowFunction that only pass messages through
>    - Stream time characteristic set to TimeCharacteristic.ProcessingTime
>
> Testing scenario:
>
>    - Start flink job and check initial state size - State Size: 127 KB
>    - Start sending messages, 1000 same unique keys every 1s (they are not
>    falling into defined time window gap set to 100ms, each message should
>    create new window)
>    - State of the process window operator keeps increasing - after 1mln
>    messages state ended up to be around 2mb
>    - Stop sending messages and wait till rabbit queue is fully consumed
>    and few checkpoints go by
>    - Was expected to see state size to decrease to base value but it
>    stayed at 2mb
>    - Continue to send messages with the same keys and state kept
>    increasing trend.
>
> What I checked:
>
>    - Registration and deregistration of timers set for time windows -
>    each registration matched its deregistration
>    - Checked that in fact there are no window merges
>    - Tried custom Trigger disabling window merges and setting
>    onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>    behavior
>
> Tested with:
>
>    - Local Flink Mini Cluster running from IDE
>    - Flink ha standalone cluster  run in docker
>
> On staging environment, we noticed that state for that operator keeps
> increasing indefinitely, after some months reaching even 1,5gb for 100k
> unique keys
>
> With best regards
>
> Oliwer
> adbglobal.com <https://www.adbglobal.com>
>
> *This message (including any attachments) may contain confidential,
> proprietary, privileged and/or private information. The information is
> intended for the use of the individual or entity designated above. If you
> are not the intended recipient of this message, please notify the sender
> immediately, and delete the message and any attachments. Any disclosure,
> reproduction, distribution or other use of this message or any attachments
> by an individual or entity other than the intended recipient is STRICTLY
> PROHIBITED.*
>
> *Please note that ADB protects your privacy. Any personal information we
> collect from you is used in accordance with our Privacy Policy
> <https://www.adbglobal.com/privacy-policy/> and in compliance with
> applicable European data protection law (Regulation (EU) 2016/679, General
> Data Protection Regulation) and other statutory provisions. *
>

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

Posted by Oliwer Kostera <O....@adbglobal.com>.
Hi,

I actually created Jira issue before posting it to mailing list. Today I added steps to reproduce with tests outcome of different scenarios to the repository.

Jira issue: https://issues.apache.org/jira/browse/FLINK-14197

Repository: https://github.com/loliver1234/flink-process-window-function

With best regards

Oliwer

On 02.10.2019 12:05, Fabian Hueske wrote:
Hi Oliwer,

I think you are right. There seems to be something going wrong.
Just to clarify, you are sure that the growing state size is caused by the window operator?

From your description I assume that the state size does not depend (solely) on the number of distinct keys.
Otherwise, the state size would stop growing at some point.
This would be a hint that every window leaves some state behind.

AFAIK, processing time session windows are not very common. There might be a bug in the implementation.

Could you create a Jira with a description of the problem?
It would be great, if you could provide a reproducible example with a data generator source.

Thank you,
Fabian

Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera <O....@adbglobal.com>>:

Hi,

I'm no sure what you mean by windowState.clear(). As far as I understand you correctly it's a windowState from ProcessWindowFunction Context which is KeyedStateStore. KeyedStateStore is managing registered keyed states that I don't have, so without a descriptor I can't access any clear() method. There is no state that I manage explicitly as you can see here: https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java

With best regards

Oliwer

On 01.10.2019 07:48, Congxian Qiu wrote:
Hi Oliwer,

From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared.

Best,
Congxian


Oliwer Kostera <O....@adbglobal.com>> 于2019年9月27日周五 下午4:14写道:

Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator<Message> processWindowFunctionStream =
     keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
                .process(new CustomProcessWindowFunction())
                .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
                .name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window)
  *   State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 2mb
  *   Continue to send messages with the same keys and state kept increasing trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys

With best regards

Oliwer

[X]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect from you is used in accordance with our Privacy Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with applicable European data protection law (Regulation (EU) 2016/679, General Data Protection Regulation) and other statutory provisions.

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Oliwer,

I think you are right. There seems to be something going wrong.
Just to clarify, you are sure that the growing state size is caused by the
window operator?

From your description I assume that the state size does not depend (solely)
on the number of distinct keys.
Otherwise, the state size would stop growing at some point.
This would be a hint that every window leaves some state behind.

AFAIK, processing time session windows are not very common. There might be
a bug in the implementation.

Could you create a Jira with a description of the problem?
It would be great, if you could provide a reproducible example with a data
generator source.

Thank you,
Fabian

Am Di., 1. Okt. 2019 um 11:18 Uhr schrieb Oliwer Kostera <
O.Kostera@adbglobal.com>:

> Hi,
>
> I'm no sure what you mean by windowState.clear(). As far as I understand
> you correctly it's a windowState from ProcessWindowFunction Context which
> is KeyedStateStore. KeyedStateStore is managing registered keyed states
> that I don't have, so without a descriptor I can't access any clear()
> method. There is no state that I manage explicitly as you can see here:
> https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java
>
> With best regards
>
> Oliwer
> On 01.10.2019 07:48, Congxian Qiu wrote:
>
> Hi Oliwer,
>
> From the description, Seems the state didn't be cleared, maybe you could
> check how many {{windowState.clear()}} was triggered in
> {{WindowOperator#processElement}}, and try to figure it out why the state
> did not be cleared.
>
> Best,
> Congxian
>
>
> Oliwer Kostera <O....@adbglobal.com> 于2019年9月27日周五 下午4:14写道:
>
>> Hi all,
>>
>>
>> I'm using *ProcessWindowFunction* in a keyed stream with the following
>> definition:
>>
>> final SingleOutputStreamOperator<Message> processWindowFunctionStream =
>>      keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>>                 .process(new CustomProcessWindowFunction())
>>                 .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>>                 .name("Process window function");
>>
>> My checkpointing configuration is set to use RocksDB state backend with
>> incremental checkpointing and EXACTLY_ONCE mode.
>>
>> In a runtime I noticed that even though data ingestion is static - same
>> keys and frequency of messages the size of the process window operator
>> keeps increasing. I tried to reproduce it with minimal similar setup here:
>>  https://github.com/loliver1234/flink-process-window-function and was
>> successful to do so.
>>
>> Testing conditions:
>>
>>    - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
>>    - RabbitMQ sink to collect messages
>>    - Simple ProcessWindowFunction that only pass messages through
>>    - Stream time characteristic set to TimeCharacteristic.ProcessingTime
>>
>> Testing scenario:
>>
>>    - Start flink job and check initial state size - State Size: 127 KB
>>    - Start sending messages, 1000 same unique keys every 1s (they are
>>    not falling into defined time window gap set to 100ms, each message should
>>    create new window)
>>    - State of the process window operator keeps increasing - after 1mln
>>    messages state ended up to be around 2mb
>>    - Stop sending messages and wait till rabbit queue is fully consumed
>>    and few checkpoints go by
>>    - Was expected to see state size to decrease to base value but it
>>    stayed at 2mb
>>    - Continue to send messages with the same keys and state kept
>>    increasing trend.
>>
>> What I checked:
>>
>>    - Registration and deregistration of timers set for time windows -
>>    each registration matched its deregistration
>>    - Checked that in fact there are no window merges
>>    - Tried custom Trigger disabling window merges and setting
>>    onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state
>>    behavior
>>
>> Tested with:
>>
>>    - Local Flink Mini Cluster running from IDE
>>    - Flink ha standalone cluster  run in docker
>>
>> On staging environment, we noticed that state for that operator keeps
>> increasing indefinitely, after some months reaching even 1,5gb for 100k
>> unique keys
>>
>> With best regards
>>
>> Oliwer
>> adbglobal.com <https://www.adbglobal.com>
>>
>> *This message (including any attachments) may contain confidential,
>> proprietary, privileged and/or private information. The information is
>> intended for the use of the individual or entity designated above. If you
>> are not the intended recipient of this message, please notify the sender
>> immediately, and delete the message and any attachments. Any disclosure,
>> reproduction, distribution or other use of this message or any attachments
>> by an individual or entity other than the intended recipient is STRICTLY
>> PROHIBITED.*
>>
>> *Please note that ADB protects your privacy. Any personal information we
>> collect from you is used in accordance with our Privacy Policy
>> <https://www.adbglobal.com/privacy-policy/> and in compliance with
>> applicable European data protection law (Regulation (EU) 2016/679, General
>> Data Protection Regulation) and other statutory provisions. *
>>
>

Re: Increasing trend for state size of keyed stream using ProcessWindowFunction with ProcessingTimeSessionWindows

Posted by Oliwer Kostera <O....@adbglobal.com>.
Hi,

I'm no sure what you mean by windowState.clear(). As far as I understand you correctly it's a windowState from ProcessWindowFunction Context which is KeyedStateStore. KeyedStateStore is managing registered keyed states that I don't have, so without a descriptor I can't access any clear() method. There is no state that I manage explicitly as you can see here: https://github.com/loliver1234/flink-process-window-function/blob/master/src/main/java/personal/kostera/functions/CustomProcessWindowFunction.java

With best regards

Oliwer

On 01.10.2019 07:48, Congxian Qiu wrote:
Hi Oliwer,

From the description, Seems the state didn't be cleared, maybe you could check how many {{windowState.clear()}} was triggered in {{WindowOperator#processElement}}, and try to figure it out why the state did not be cleared.

Best,
Congxian


Oliwer Kostera <O....@adbglobal.com>> 于2019年9月27日周五 下午4:14写道:

Hi all,


I'm using ProcessWindowFunction in a keyed stream with the following definition:

final SingleOutputStreamOperator<Message> processWindowFunctionStream =
     keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
                .process(new CustomProcessWindowFunction())
                .uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
                .name("Process window function");


My checkpointing configuration is set to use RocksDB state backend with incremental checkpointing and EXACTLY_ONCE mode.

In a runtime I noticed that even though data ingestion is static - same keys and frequency of messages the size of the process window operator keeps increasing. I tried to reproduce it with minimal similar setup here: https://github.com/loliver1234/flink-process-window-function and was successful to do so.

Testing conditions:

  *   RabbitMQ source with Exactly-once guarantee and 65k prefetch count
  *   RabbitMQ sink to collect messages
  *   Simple ProcessWindowFunction that only pass messages through
  *   Stream time characteristic set to TimeCharacteristic.ProcessingTime

Testing scenario:

  *   Start flink job and check initial state size - State Size: 127 KB
  *   Start sending messages, 1000 same unique keys every 1s (they are not falling into defined time window gap set to 100ms, each message should create new window)
  *   State of the process window operator keeps increasing - after 1mln messages state ended up to be around 2mb
  *   Stop sending messages and wait till rabbit queue is fully consumed and few checkpoints go by
  *   Was expected to see state size to decrease to base value but it stayed at 2mb
  *   Continue to send messages with the same keys and state kept increasing trend.

What I checked:

  *   Registration and deregistration of timers set for time windows - each registration matched its deregistration
  *   Checked that in fact there are no window merges
  *   Tried custom Trigger disabling window merges and setting onProcessingTime trigger to TriggerResult.FIRE_AND_PURGE - same state behavior

Tested with:

  *   Local Flink Mini Cluster running from IDE
  *   Flink ha standalone cluster  run in docker

On staging environment, we noticed that state for that operator keeps increasing indefinitely, after some months reaching even 1,5gb for 100k unique keys

With best regards

Oliwer

[https://www.adbglobal.com/wp-content/uploads/adb.png]
adbglobal.com<https://www.adbglobal.com>
This message (including any attachments) may contain confidential, proprietary, privileged and/or private information. The information is intended for the use of the individual or entity designated above. If you are not the intended recipient of this message, please notify the sender immediately, and delete the message and any attachments. Any disclosure, reproduction, distribution or other use of this message or any attachments by an individual or entity other than the intended recipient is STRICTLY PROHIBITED.
Please note that ADB protects your privacy. Any personal information we collect from you is used in accordance with our Privacy Policy<https://www.adbglobal.com/privacy-policy/> and in compliance with applicable European data protection law (Regulation (EU) 2016/679, General Data Protection Regulation) and other statutory provisions.