You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vidya Sagar Mula <mu...@gmail.com> on 2021/12/07 00:03:30 UTC

Issue with incremental checkpointing size

Hi,

In my project, we are trying to configure the "Incremental checkpointing"
with RocksDB in the backend.

We are using Flink 1.11 version and RockDB with AWS : S3 backend

Issue:
------
In my pipeline, my window size is 5 mins and the incremental checkpointing
is happening for every 2 mins.
I am pumping the data in such a way that the keys are not the same for each
record. That means, the incremental checkpointing size should keep
increasing for each checkpoint.

So, the expectation here is that the size of the checkpointing should reach
atleast 3-5 GB with the amount of the data pumped in.

However, the checkpointing size is not going beyond 300 MB and that too it
is taking around 2 mins duration for taking this 300 MB checkpoint.

In my set up, I am using

Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Memory: 4.5 GB
Flink Version : 1.11
Back end: RocksDB with AWS S3 backend


I would feel that, there must be something bottleneck with Flink RocksDB
configurations.
Can you please advise me?

Thanks,

Re: Issue with incremental checkpointing size

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Thanks for the clarification.

Could you expand checkpoint #195 and #196 for details? Slow end-to-end
checkpoint time may be caused by various reasons, for example if the data
processing is slow it will backpressure the checkpoint (if that is the
case, alignment duration should be high for some vertices), or it is
possible that JVM is under heavy GC at that time.

By "increase the parallelism" I mean to increase the parallelism of the
Flink job. If you double the parallelism each subtask will only have to
deal with half of the data before and this should speed up data processing
and checkpointing. state.backend.rocksdb.checkpoint.transfer.thread.num
might help but for a ~300MB checkpoint I guess we do not need to tune this.

Vidya Sagar Mula <mu...@gmail.com> 于2021年12月7日周二 15:22写道:

> Hi Caizhi,
>
> Thank you for your response.
> I am attaching 3 screen shots for better understanding. Can you please
> take a look.
> Screenshot 1: Flink UI with the checkpoiting history (Size and time taken
> are displayed)
> Screenshot 2: Flink UI for Task managers. I have one TM.
> Screenshot 3: Grafana UI for CPU and memory utilization.
>
> For point 1: It is incremental checkpointing. However, I am making my
> input data in such a way that the keys in the data records are not the
> same. So, it is going to be full checkpointing size.
>
>
> For point 2: If you notice the screen shot with Flink UI, when the
> checkpointing size is reached to 267MB, the time taken is almost 4 mins,
> which is definitely not expected. You mentioned increasing the parallelism.
> Can you please explain a little bit more on this?
> There is a RocksDB configurable parameter "
> *state.backend.rocksdb.checkpoint.transfer.thread.num"* (default=1). Are
> you referring to this parameter to increase the parallelism. Can you please
> elaborate on this?
>
>
> And also, there are some tunable RocksDB parameters as mentioned below. I
> am currently using the default values. Do I need to set some higher values
> to increase the checkpoint size with the increased input data size.
>
> Can you please clarify if I need to tune up any of the configurable
> paremeters to increase the checkpointing size and to reduce the time taken
> for each checkpointing.
>
> *state.backend.rocksdb.checkpoint.transfer.thread.num (default to 1)*
>
> *state.backend.rocksdb.writebuffer.size (RocksDB defaults to 4MB)*
>
> state.backend.rocksdb.writebuffer.count (RocksDB defaults to 2)
>
> state.backend.rocksdb.writebuffer.number-to-merge (RocksDB defaults to 1)
>
> *state.backend.rocksdb.block.blocksize (RocksDB defaults to 4KB)*
>
> state.backend.rocksdb.block.cache-size ( RocksDB defaults to 8MB)
>
>
>
>
> On Mon, Dec 6, 2021 at 6:05 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> the checkpointing size is not going beyond 300 MB
>>
>>
>> Is 300MB the total size of checkpoint or the incremental size of
>> checkpoint? If it is the latter one, Flink will only store necessary
>> information (for example the keys and the fields that are selected) in
>> checkpoint and it is compressed, so for 3~5GB input records it is
>> reasonable for the incremental checkpoint size to shrink to ~300MB. Of
>> course this depends on the detailed workflow.
>>
>> there must be something bottleneck with Flink RocksDB configurations.
>>>
>>
>> By "bottleneck" do you mean the checkpoint speed is too slow? If yes you
>> can try to increase the parallelism of the job so that there will be less
>> burden on each parallelism when making checkpoints.
>>
>> Vidya Sagar Mula <mu...@gmail.com> 于2021年12月7日周二 08:04写道:
>>
>>> Hi,
>>>
>>> In my project, we are trying to configure the "Incremental
>>> checkpointing" with RocksDB in the backend.
>>>
>>> We are using Flink 1.11 version and RockDB with AWS : S3 backend
>>>
>>> Issue:
>>> ------
>>> In my pipeline, my window size is 5 mins and the incremental
>>> checkpointing is happening for every 2 mins.
>>> I am pumping the data in such a way that the keys are not the same for
>>> each record. That means, the incremental checkpointing size should keep
>>> increasing for each checkpoint.
>>>
>>> So, the expectation here is that the size of the checkpointing should
>>> reach atleast 3-5 GB with the amount of the data pumped in.
>>>
>>> However, the checkpointing size is not going beyond 300 MB and that too
>>> it is taking around 2 mins duration for taking this 300 MB checkpoint.
>>>
>>> In my set up, I am using
>>>
>>> Cluster: Cloud cluster with instance storage.
>>> Memory : 20 GB,
>>> Heap : 10 GB
>>> Flink Memory: 4.5 GB
>>> Flink Version : 1.11
>>> Back end: RocksDB with AWS S3 backend
>>>
>>>
>>> I would feel that, there must be something bottleneck with Flink RocksDB
>>> configurations.
>>> Can you please advise me?
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>>

Re: Issue with incremental checkpointing size

Posted by Vidya Sagar Mula <mu...@gmail.com>.
Hi Caizhi,

Thank you for your response.
I am attaching 3 screen shots for better understanding. Can you please take
a look.
Screenshot 1: Flink UI with the checkpoiting history (Size and time taken
are displayed)
Screenshot 2: Flink UI for Task managers. I have one TM.
Screenshot 3: Grafana UI for CPU and memory utilization.

For point 1: It is incremental checkpointing. However, I am making my input
data in such a way that the keys in the data records are not the same. So,
it is going to be full checkpointing size.


For point 2: If you notice the screen shot with Flink UI, when the
checkpointing size is reached to 267MB, the time taken is almost 4 mins,
which is definitely not expected. You mentioned increasing the parallelism.
Can you please explain a little bit more on this?
There is a RocksDB configurable parameter "
*state.backend.rocksdb.checkpoint.transfer.thread.num"* (default=1). Are
you referring to this parameter to increase the parallelism. Can you please
elaborate on this?


And also, there are some tunable RocksDB parameters as mentioned below. I
am currently using the default values. Do I need to set some higher values
to increase the checkpoint size with the increased input data size.

Can you please clarify if I need to tune up any of the configurable
paremeters to increase the checkpointing size and to reduce the time taken
for each checkpointing.

*state.backend.rocksdb.checkpoint.transfer.thread.num (default to 1)*

*state.backend.rocksdb.writebuffer.size (RocksDB defaults to 4MB)*

state.backend.rocksdb.writebuffer.count (RocksDB defaults to 2)

state.backend.rocksdb.writebuffer.number-to-merge (RocksDB defaults to 1)

*state.backend.rocksdb.block.blocksize (RocksDB defaults to 4KB)*

state.backend.rocksdb.block.cache-size ( RocksDB defaults to 8MB)




On Mon, Dec 6, 2021 at 6:05 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> the checkpointing size is not going beyond 300 MB
>
>
> Is 300MB the total size of checkpoint or the incremental size of
> checkpoint? If it is the latter one, Flink will only store necessary
> information (for example the keys and the fields that are selected) in
> checkpoint and it is compressed, so for 3~5GB input records it is
> reasonable for the incremental checkpoint size to shrink to ~300MB. Of
> course this depends on the detailed workflow.
>
> there must be something bottleneck with Flink RocksDB configurations.
>>
>
> By "bottleneck" do you mean the checkpoint speed is too slow? If yes you
> can try to increase the parallelism of the job so that there will be less
> burden on each parallelism when making checkpoints.
>
> Vidya Sagar Mula <mu...@gmail.com> 于2021年12月7日周二 08:04写道:
>
>> Hi,
>>
>> In my project, we are trying to configure the "Incremental checkpointing"
>> with RocksDB in the backend.
>>
>> We are using Flink 1.11 version and RockDB with AWS : S3 backend
>>
>> Issue:
>> ------
>> In my pipeline, my window size is 5 mins and the incremental
>> checkpointing is happening for every 2 mins.
>> I am pumping the data in such a way that the keys are not the same for
>> each record. That means, the incremental checkpointing size should keep
>> increasing for each checkpoint.
>>
>> So, the expectation here is that the size of the checkpointing should
>> reach atleast 3-5 GB with the amount of the data pumped in.
>>
>> However, the checkpointing size is not going beyond 300 MB and that too
>> it is taking around 2 mins duration for taking this 300 MB checkpoint.
>>
>> In my set up, I am using
>>
>> Cluster: Cloud cluster with instance storage.
>> Memory : 20 GB,
>> Heap : 10 GB
>> Flink Memory: 4.5 GB
>> Flink Version : 1.11
>> Back end: RocksDB with AWS S3 backend
>>
>>
>> I would feel that, there must be something bottleneck with Flink RocksDB
>> configurations.
>> Can you please advise me?
>>
>> Thanks,
>>
>>
>>
>>
>>

Re: Issue with incremental checkpointing size

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

the checkpointing size is not going beyond 300 MB


Is 300MB the total size of checkpoint or the incremental size of
checkpoint? If it is the latter one, Flink will only store necessary
information (for example the keys and the fields that are selected) in
checkpoint and it is compressed, so for 3~5GB input records it is
reasonable for the incremental checkpoint size to shrink to ~300MB. Of
course this depends on the detailed workflow.

there must be something bottleneck with Flink RocksDB configurations.
>

By "bottleneck" do you mean the checkpoint speed is too slow? If yes you
can try to increase the parallelism of the job so that there will be less
burden on each parallelism when making checkpoints.

Vidya Sagar Mula <mu...@gmail.com> 于2021年12月7日周二 08:04写道:

> Hi,
>
> In my project, we are trying to configure the "Incremental checkpointing"
> with RocksDB in the backend.
>
> We are using Flink 1.11 version and RockDB with AWS : S3 backend
>
> Issue:
> ------
> In my pipeline, my window size is 5 mins and the incremental checkpointing
> is happening for every 2 mins.
> I am pumping the data in such a way that the keys are not the same for
> each record. That means, the incremental checkpointing size should keep
> increasing for each checkpoint.
>
> So, the expectation here is that the size of the checkpointing should
> reach atleast 3-5 GB with the amount of the data pumped in.
>
> However, the checkpointing size is not going beyond 300 MB and that too it
> is taking around 2 mins duration for taking this 300 MB checkpoint.
>
> In my set up, I am using
>
> Cluster: Cloud cluster with instance storage.
> Memory : 20 GB,
> Heap : 10 GB
> Flink Memory: 4.5 GB
> Flink Version : 1.11
> Back end: RocksDB with AWS S3 backend
>
>
> I would feel that, there must be something bottleneck with Flink RocksDB
> configurations.
> Can you please advise me?
>
> Thanks,
>
>
>
>
>