You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vidya Sagar Mula <mu...@gmail.com> on 2022/03/04 09:00:30 UTC

Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3
backend. I am trying the incremental checkpointing on this set up. I have a
pipeline with a 10 mins window and incremental checkpointing happens every
2 mins.

Observation:
-------------
I am observing the long duration while taking the snapshot at the end of
each window, which means every last checkpoint of the window (almost all
the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
-------------------
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost
100% at the time of issue. With further analysis with thread dumps at the
time CPU peak, it is showing RocksDB serialization related call trace. All
the thread samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo
Serializer. I did try to change the serializer type, but that is not the
focal point I want to stress here.

I would like to understand the reason for high CPU utilization. I have
tried to increase the CPU cycles to 2 and 4. But, it did not give me any
better results. I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is
taking a lot of CPU at the time of serialize/deserialize in the RocksDB?

########

Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
-----------
- Why is the incremental checkpointing taking more time for the snapshot at
the end of the window duration?
- Why is RocksDB serialization causing the CPU peak?
- Do you suggest any change in the serializer type in the RocksDB? (Kryo vs
Avro)

Thank you,

RE: Incremental checkpointing & RocksDB Serialization

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Vidya,

As to the choice of serializer:

  *   Flink provides two implementations that support state migration, AVRO serializer, and Pojo serializer
  *   Pojo serializer happens to be one of the fastest available serializers (faster than AVRO)
  *   If your record sticks to Pojo coding rules it is probably a good choice, no extra serializer coding needed
  *   See here [1]

As to the extra big incremental checkpoints at the end of a time window:

  *   This is quite plausible,
  *   windowing uses the ‘namespace’ subkey of keyed state
  *   ideally incremental checkpoints only store changes made since the last checkpoint, and
  *   on a window change many window instances (i.e. one per key and time interval) disappear and are eventually recreated for the next time interval, hence the bigger checkpoint
  *   serialization efforts depend on the choice of state backend:
     *   RocksDBStateBackend dominantly uses serializers when reading and writing state but to a lesser extend for checkpoints
     *   FsStateBackend does not use serializers when reading and writing state but dominantly during checkpoints


In order to improve your situation you need to take a closer look into

  *   The numbers (how many keys, how many active window instances (globally/per key), how many events are collected per window instance)
  *   The specific implementation of the rollup/aggregation function
     *   There are setups that store all events and iterate whenever a window result is needed (triggered)
     *   Other setups pre-aggregate incoming events and summarize only when a window result is needed (triggered)
     *   This choice makes a big difference when it comes to state size

Hope this helps … feel free to get back with further questions 😊


Thias



[1] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#pojoserializer

From: Vidya Sagar Mula <mu...@gmail.com>
Sent: Dienstag, 8. März 2022 02:44
To: Yun Tang <my...@live.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun,

Thank you for the response.


1.      You could tune your job to avoid backpressure. Maybe you can upgrade your flink engine to at least flink-1.13 to know how to monitor the back pressure status [1].
[VIDYA] - In the view of my organization, it's a very big activity to upgrade to Flink version from our current one(1.11). I need to continue for my dev activity with 1.11 only.
1.      You can refer to [2] to know how to custom your serializer.
[VIDYA] - Thanks for providing me with the link references for custom serializer. I am wondering, how is the serialization part in the incremental checkpointing is different from Full checkpointing. My pipeline logic is same for both Full checkpoint and Incremental checkpoint, except the checkpoint.type variable change and some other env variables. But, the code pipeline logic should be same for both types of checkpoints.

- Full checkpoint of pipeline is not taking considerably long time when compared to incremental checkpointing at the end of the window. I see the backpressure is High and CPU utilization is high with incremental checkpointing. Thread dump shows the stack related to serialization. How is the serialization part different between full checkpointing vs Incremental checkpointing? I know, RocksDB library has some serializers for Incremental.

- While I am not writing custom serializer for my pipeline in case of Full checkpointing, is it the general pattern to implement custom serializer in case of Incremental?

- With respect with serializers for Full vs Incremental checkpointing, What's the general usage pattern across the Flink community? If I write custom serializer for Incremental, how does it go with Full checkpointing.

Please clarify.

Thanks,
Vidya.




[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

On Sun, Mar 6, 2022 at 12:11 AM Yun Tang <my...@live.com>> wrote:
Hi Vidya,


  1.  You could tune your job to avoid backpressure. Maybe you can upgrade your flink engine to at least flink-1.13 to know how to monitor the back pressure status [1]
  2.  You can refer to [2] to know how to custom your serializer.


[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

Best,
Yun Tang
________________________________
From: Vidya Sagar Mula <mu...@gmail.com>>
Sent: Sunday, March 6, 2022 4:16
To: Yun Tang <my...@live.com>>
Cc: user <us...@flink.apache.org>>
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more details. Can you please clarify my inline questions?

> Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?

I guess that this is because the job is under back pressure on end of window. You can expand the checkpoint details to see whether that the async duration of each task is much slower than the e2e duration? If so, this caused the checkpoint barrier stay in the channel longer.

<VIDYA> - Yes, I expanded the checkpoint details and noticed e2e duration is much higher than async duration. Attaching the screenshot here(Checkpoint #59) Can you give elaborate more on "checkpoint barrier stay in the channel longer." What are the suggested ways to mitigate this issue? I am wondering how can this be avoided as it is happening only at the end of the window.


> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

From our experience,  kryo is not a good choice in most cases.

<VIDYA> - What are your recommendations on other serializers? I tried to change it to Avro by enabling the flag "forceAvro" to TRUE in the Execution Config. But, it RocksDB is still going picking KryoSerializer. This is because the Transformation is KeyType is assigned as GenericType. I am not sure what changes need to made to my class/pojo to take the Avro Serialzer.
Can you please suggest the way to change to other better serializers?


On Fri, Mar 4, 2022 at 2:06 AM Yun Tang <my...@live.com>> wrote:
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?

I guess that this is because the job is under back pressure on end of window. You can expand the checkpoint details to see whether that the async duration of each task is much slower than the e2e duration? If so, this caused the checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang
________________________________
From: Vidya Sagar Mula <mu...@gmail.com>>
Sent: Friday, March 4, 2022 17:00
To: user <us...@flink.apache.org>>
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I am trying the incremental checkpointing on this set up. I have a pipeline with a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-------------
I am observing the long duration while taking the snapshot at the end of each window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
-------------------
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% at the time of issue. With further analysis with thread dumps at the time CPU peak, it is showing RocksDB serialization related call trace. All the thread samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo Serializer. I did try to change the serializer type, but that is not the focal point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to increase the CPU cycles to 2 and 4. But, it did not give me any better results. I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking a lot of CPU at the time of serialize/deserialize in the RocksDB?

########

Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
-----------
- Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?
- Why is RocksDB serialization causing the CPU peak?
- Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

Thank you,








Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Incremental checkpointing & RocksDB Serialization

Posted by Vidya Sagar Mula <mu...@gmail.com>.
Hi Yun,

Thank you for the response.



   1. You could tune your job to avoid backpressure. Maybe you can upgrade
   your flink engine to at least flink-1.13 to know how to monitor the back
   pressure status [1].

[VIDYA] - In the view of my organization, it's a very big activity to
upgrade to Flink version from our current one(1.11). I need to continue for
my dev activity with 1.11 only.

   1. You can refer to [2] to know how to custom your serializer.

[VIDYA] - Thanks for providing me with the link references for custom
serializer. I am wondering, how is the serialization part in the
incremental checkpointing is different from Full checkpointing. My pipeline
logic is same for both Full checkpoint and Incremental checkpoint, except
the checkpoint.type variable change and some other env variables. But, the
code pipeline logic should be same for both types of checkpoints.

- Full checkpoint of pipeline is not taking considerably long time when
compared to incremental checkpointing at the end of the window. I see the
backpressure is High and CPU utilization is high with incremental
checkpointing. Thread dump shows the stack related to serialization. How is
the serialization part different between full checkpointing vs Incremental
checkpointing? I know, RocksDB library has some serializers for Incremental.

- While I am not writing custom serializer for my pipeline in case of Full
checkpointing, is it the general pattern to implement custom serializer in
case of Incremental?

- With respect with serializers for Full vs Incremental checkpointing,
What's the general usage pattern across the Flink community? If I write
custom serializer for Incremental, how does it go with Full checkpointing.

Please clarify.

Thanks,
Vidya.




[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

On Sun, Mar 6, 2022 at 12:11 AM Yun Tang <my...@live.com> wrote:

> Hi Vidya,
>
>
>    1. You could tune your job to avoid backpressure. Maybe you can
>    upgrade your flink engine to at least flink-1.13 to know how to monitor the
>    back pressure status [1]
>    2. You can refer to [2] to know how to custom your serializer.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/
>
> Best,
> Yun Tang
> ------------------------------
> *From:* Vidya Sagar Mula <mu...@gmail.com>
> *Sent:* Sunday, March 6, 2022 4:16
> *To:* Yun Tang <my...@live.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Incremental checkpointing & RocksDB Serialization
>
> Hi Yun Tang,
> Thank you for the reply. I have follow up questions and need some more
> details. Can you please clarify my inline questions?
>
> > Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
>
> I guess that this is because the job is under back pressure on end of
> window. You can expand the checkpoint details to see whether that the async
> duration of each task is much slower than the e2e duration? If so, this
> caused the checkpoint barrier stay in the channel longer.
>
> *<VIDYA> - Yes, I expanded the checkpoint details and noticed e2e duration
> is much higher than async duration. Attaching the screenshot
> here(Checkpoint #59) Can you give elaborate more on "checkpoint barrier
> stay in the channel longer." What are the suggested ways to mitigate this
> issue? I am wondering how can this be avoided as it is happening only at
> the end of the window.*
>
>
> > Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> From our experience,  kryo is not a good choice in most cases.
>
>
> *<VIDYA> - What are your recommendations on other serializers? I tried to
> change it to Avro by enabling the flag "forceAvro" to TRUE in the Execution
> Config. But, it RocksDB is still going picking KryoSerializer. This is
> because the Transformation is KeyType is assigned as GenericType. I am not
> sure what changes need to made to my class/pojo to take the Avro
> Serialzer.  Can you please suggest the way to change to other better
> serializers?*
>
>
>
> On Fri, Mar 4, 2022 at 2:06 AM Yun Tang <my...@live.com> wrote:
>
> Hi Vidya,
>
> > Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
>
> I guess that this is because the job is under back pressure on end of
> window. You can expand the checkpoint details to see whether that the async
> duration of each task is much slower than the e2e duration? If so, this
> caused the checkpoint barrier stay in the channel longer.
>
> > Why is RocksDB serialization causing the CPU peak?
>
> This is caused by the implementation of your serializer.
>
> > Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> From our experience,  kryo is not a good choice in most cases.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Vidya Sagar Mula <mu...@gmail.com>
> *Sent:* Friday, March 4, 2022 17:00
> *To:* user <us...@flink.apache.org>
> *Subject:* Incremental checkpointing & RocksDB Serialization
>
> Hi,
>
> I have a cluster that contains the Flink 1.11 version with AWS - S3
> backend. I am trying the incremental checkpointing on this set up. I have a
> pipeline with a 10 mins window and incremental checkpointing happens every
> 2 mins.
>
> Observation:
> -------------
> I am observing the long duration while taking the snapshot at the end of
> each window, which means every last checkpoint of the window (almost all
> the times).
> I am attaching the Flink UI, checkpoint history.
>
> My set up details:
> -------------------
> Cluster: Cloud cluster with instance storage.
> Memory : 20 GB,
> Heap : 10 GB
> Flink Managed Memory: 4.5 GB
> Flink Version : 1.11
> CPUs : 2
>
> ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB
>
> ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb
>
> ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb
>
> ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4
>
> ROCKSDB_MAX_BACKGROUND_THREADS: 4
>
>
> In the analysis, I noticed that the CPU utilization is peaking to almost
> 100% at the time of issue. With further analysis with thread dumps at the
> time CPU peak, it is showing RocksDB serialization related call trace. All
> the thread samples are pointing to this stack.
>
> Based on pipeline transformation class type, RocksDB is choosing Kryo
> Serializer. I did try to change the serializer type, but that is not the
> focal point I want to stress here.
>
> I would like to understand the reason for high CPU utilization. I have
> tried to increase the CPU cycles to 2 and 4. But, it did not give me any
> better results. I have parallelism 2.
>
> Please take a look at the below stack trace. Please suggest me why it is
> taking a lot of CPU at the time of serialize/deserialize in the RocksDB?
>
> ########
>
> Stack-1, Stack-2, Stack-3 are attached to this email.
>
> Questions:
> -----------
> - Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
> - Why is RocksDB serialization causing the CPU peak?
> - Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> Thank you,
>
>
>
>
>
>
>
>
>
>

Re: Incremental checkpointing & RocksDB Serialization

Posted by Yun Tang <my...@live.com>.
Hi Vidya,


  1.  You could tune your job to avoid backpressure. Maybe you can upgrade your flink engine to at least flink-1.13 to know how to monitor the back pressure status [1]
  2.  You can refer to [2] to know how to custom your serializer.


[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

Best,
Yun Tang
________________________________
From: Vidya Sagar Mula <mu...@gmail.com>
Sent: Sunday, March 6, 2022 4:16
To: Yun Tang <my...@live.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more details. Can you please clarify my inline questions?

> Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?

I guess that this is because the job is under back pressure on end of window. You can expand the checkpoint details to see whether that the async duration of each task is much slower than the e2e duration? If so, this caused the checkpoint barrier stay in the channel longer.

<VIDYA> - Yes, I expanded the checkpoint details and noticed e2e duration is much higher than async duration. Attaching the screenshot here(Checkpoint #59) Can you give elaborate more on "checkpoint barrier stay in the channel longer." What are the suggested ways to mitigate this issue? I am wondering how can this be avoided as it is happening only at the end of the window.


> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

From our experience,  kryo is not a good choice in most cases.

<VIDYA> - What are your recommendations on other serializers? I tried to change it to Avro by enabling the flag "forceAvro" to TRUE in the Execution Config. But, it RocksDB is still going picking KryoSerializer. This is because the Transformation is KeyType is assigned as GenericType. I am not sure what changes need to made to my class/pojo to take the Avro Serialzer.
Can you please suggest the way to change to other better serializers?



On Fri, Mar 4, 2022 at 2:06 AM Yun Tang <my...@live.com>> wrote:
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?

I guess that this is because the job is under back pressure on end of window. You can expand the checkpoint details to see whether that the async duration of each task is much slower than the e2e duration? If so, this caused the checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang
________________________________
From: Vidya Sagar Mula <mu...@gmail.com>>
Sent: Friday, March 4, 2022 17:00
To: user <us...@flink.apache.org>>
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I am trying the incremental checkpointing on this set up. I have a pipeline with a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-------------
I am observing the long duration while taking the snapshot at the end of each window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
-------------------
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% at the time of issue. With further analysis with thread dumps at the time CPU peak, it is showing RocksDB serialization related call trace. All the thread samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo Serializer. I did try to change the serializer type, but that is not the focal point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to increase the CPU cycles to 2 and 4. But, it did not give me any better results. I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking a lot of CPU at the time of serialize/deserialize in the RocksDB?

########

Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
-----------
- Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?
- Why is RocksDB serialization causing the CPU peak?
- Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

Thank you,










Re: Incremental checkpointing & RocksDB Serialization

Posted by Vidya Sagar Mula <mu...@gmail.com>.
Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more
details. Can you please clarify my inline questions?

> Why is the incremental checkpointing taking more time for the snapshot at
the end of the window duration?

I guess that this is because the job is under back pressure on end of
window. You can expand the checkpoint details to see whether that the async
duration of each task is much slower than the e2e duration? If so, this
caused the checkpoint barrier stay in the channel longer.

*<VIDYA> - Yes, I expanded the checkpoint details and noticed e2e duration
is much higher than async duration. Attaching the screenshot
here(Checkpoint #59) Can you give elaborate more on "checkpoint barrier
stay in the channel longer." What are the suggested ways to mitigate this
issue? I am wondering how can this be avoided as it is happening only at
the end of the window.*


> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs
Avro)

From our experience,  kryo is not a good choice in most cases.


*<VIDYA> - What are your recommendations on other serializers? I tried to
change it to Avro by enabling the flag "forceAvro" to TRUE in the Execution
Config. But, it RocksDB is still going picking KryoSerializer. This is
because the Transformation is KeyType is assigned as GenericType. I am not
sure what changes need to made to my class/pojo to take the Avro
Serialzer. Can you please suggest the way to change to other better
serializers?*



On Fri, Mar 4, 2022 at 2:06 AM Yun Tang <my...@live.com> wrote:

> Hi Vidya,
>
> > Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
>
> I guess that this is because the job is under back pressure on end of
> window. You can expand the checkpoint details to see whether that the async
> duration of each task is much slower than the e2e duration? If so, this
> caused the checkpoint barrier stay in the channel longer.
>
> > Why is RocksDB serialization causing the CPU peak?
>
> This is caused by the implementation of your serializer.
>
> > Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> From our experience,  kryo is not a good choice in most cases.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Vidya Sagar Mula <mu...@gmail.com>
> *Sent:* Friday, March 4, 2022 17:00
> *To:* user <us...@flink.apache.org>
> *Subject:* Incremental checkpointing & RocksDB Serialization
>
> Hi,
>
> I have a cluster that contains the Flink 1.11 version with AWS - S3
> backend. I am trying the incremental checkpointing on this set up. I have a
> pipeline with a 10 mins window and incremental checkpointing happens every
> 2 mins.
>
> Observation:
> -------------
> I am observing the long duration while taking the snapshot at the end of
> each window, which means every last checkpoint of the window (almost all
> the times).
> I am attaching the Flink UI, checkpoint history.
>
> My set up details:
> -------------------
> Cluster: Cloud cluster with instance storage.
> Memory : 20 GB,
> Heap : 10 GB
> Flink Managed Memory: 4.5 GB
> Flink Version : 1.11
> CPUs : 2
>
> ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB
>
> ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb
>
> ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb
>
> ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4
>
> ROCKSDB_MAX_BACKGROUND_THREADS: 4
>
>
> In the analysis, I noticed that the CPU utilization is peaking to almost
> 100% at the time of issue. With further analysis with thread dumps at the
> time CPU peak, it is showing RocksDB serialization related call trace. All
> the thread samples are pointing to this stack.
>
> Based on pipeline transformation class type, RocksDB is choosing Kryo
> Serializer. I did try to change the serializer type, but that is not the
> focal point I want to stress here.
>
> I would like to understand the reason for high CPU utilization. I have
> tried to increase the CPU cycles to 2 and 4. But, it did not give me any
> better results. I have parallelism 2.
>
> Please take a look at the below stack trace. Please suggest me why it is
> taking a lot of CPU at the time of serialize/deserialize in the RocksDB?
>
> ########
>
> Stack-1, Stack-2, Stack-3 are attached to this email.
>
> Questions:
> -----------
> - Why is the incremental checkpointing taking more time for the snapshot
> at the end of the window duration?
> - Why is RocksDB serialization causing the CPU peak?
> - Do you suggest any change in the serializer type in the RocksDB? (Kryo
> vs Avro)
>
> Thank you,
>
>
>
>
>
>
>
>
>
>

Re: Incremental checkpointing & RocksDB Serialization

Posted by Yun Tang <my...@live.com>.
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?

I guess that this is because the job is under back pressure on end of window. You can expand the checkpoint details to see whether that the async duration of each task is much slower than the e2e duration? If so, this caused the checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang
________________________________
From: Vidya Sagar Mula <mu...@gmail.com>
Sent: Friday, March 4, 2022 17:00
To: user <us...@flink.apache.org>
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I am trying the incremental checkpointing on this set up. I have a pipeline with a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-------------
I am observing the long duration while taking the snapshot at the end of each window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
-------------------
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% at the time of issue. With further analysis with thread dumps at the time CPU peak, it is showing RocksDB serialization related call trace. All the thread samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo Serializer. I did try to change the serializer type, but that is not the focal point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to increase the CPU cycles to 2 and 4. But, it did not give me any better results. I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking a lot of CPU at the time of serialize/deserialize in the RocksDB?

########

Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
-----------
- Why is the incremental checkpointing taking more time for the snapshot at the end of the window duration?
- Why is RocksDB serialization causing the CPU peak?
- Do you suggest any change in the serializer type in the RocksDB? (Kryo vs Avro)

Thank you,