You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ravi Bhushan Ratnakar <ra...@gmail.com> on 2019/09/07 14:46:41 UTC

Checkpointing is not performing well

Hi All,

I am writing a streaming application using Flink 1.9. This application
consumes data from kinesis stream which is basically avro payload.
Application is using KeyedProcessFunction to execute business logic on the
basis of correlation id using event time characteristics with below
configuration --
StateBackend - filesystem with S3 storage
registerTimeTimer duration for each key is  -  currentWatermark  + 15
seconds
checkpoint interval - 1min
minPauseBetweenCheckpointInterval - 1 min
checkpoint timeout - 10mins

incoming data rate from kinesis -  ~10 to 21GB/min

Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)

First 2-4 checkpoints get completed within 1mins where the state size is
usually 50GB. As the state size grows beyond 50GB, then checkpointing time
starts taking more than 1mins and it increased till 10 mins and then
checkpoint fails. The moment the checkpoint starts taking more than 1 mins
to complete then application starts processing slow and start lagging in
output.

Any suggestion to fine tune checkpoint performance would be highly
appreciated.

Regards,
Ravi

Re: Checkpointing is not performing well

Posted by Vijay Bhaskar <bh...@gmail.com>.
I meant upper limit w.r.t resources you are using. Even if you
increase resources, Spiking data is always a problem which anyways you need
to take care of. Best thing is to add more back pressure from source.

Regards
Bhaskar

On Wed, Sep 11, 2019 at 1:43 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> There is no upper limit for state size in Flink. There are applications
> with 10+ TB state.
> However, it is natural that checkpointing time increases with state size
> as more data needs to be serialized (in case of FSStateBackend) and written
> to stable storage.
> (The same is btw true for recovery when the state needs to be loaded back.)
>
> There are a few tricks to reduce checkpointing time like using incremental
> checkpoints which you tried already.
> You can also scale out the application to use more machines and therefore
> bandwidth + CPU (for serialization) during checkpoints.
>
> Fabian
>
> Am Mi., 11. Sept. 2019 um 09:38 Uhr schrieb Ravi Bhushan Ratnakar <
> ravibhushanratnakar@gmail.com>:
>
>> What is the upper limit of checkpoint size of Flink System?
>>
>> Regards,
>> Ravi
>>
>> On Wed 11 Sep, 2019, 06:48 Vijay Bhaskar, <bh...@gmail.com>
>> wrote:
>>
>>> You crossed  the upper limits of the check point system of Flink a way
>>> high. Try to distribute events equally over time by adding some sort of
>>> controlled back pressure after receiving data from kinesis streams.
>>> Otherwise the spike coming during 5 seconds time would always create
>>> problems. Tomorrow it may double so best solution in your case is to
>>> deliver at configurable constant rate after receiving messages from kinesis
>>> streams. Otherwise i am sure its always the problem whatever the kind of
>>> streaming engine you use. Tune your configuration to get the optimal rate
>>> so that flink checkpoint state is healthier.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
>>> ravibhushanratnakar@gmail.com> wrote:
>>>
>>>> @Rohan - I am streaming data to kafka sink after applying business
>>>> logic. For checkpoint, I am using s3 as a distributed file system. For
>>>> local recovery, I am using Optimized iops ebs volume.
>>>>
>>>> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB
>>>> per minute compressed(lz4) avro message. Generally 90% correlated events
>>>> come within 5 seconds and 10% of the correlated events get extended to 65
>>>> minute. Due to this business requirement, the state size keep growing till
>>>> 65 minutes, after that the state size becomes more or less stable. As the
>>>> state size is growing and is around 350gb at peak load, checkpoint is not
>>>> able to complete within 1 minutes. I want to check as quick as possible
>>>> like every 5 second.
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>>
>>>> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, <bh...@gmail.com>
>>>> wrote:
>>>>
>>>>> For me task count seems to be huge in number with the mentioned
>>>>> resource count. To rule out the possibility of issue with state backend can
>>>>> you start writing sink data as <NO-Operation> , i.e., data ignore sink. And
>>>>> try whether you could run it for longer duration without any issue. You can
>>>>> start decreasing the task manager count until you find descent count of it
>>>>> without having any side effects. Use that value as task manager count and
>>>>> then start adding your state backend. First you can try with Rocks DB. With
>>>>> reduced task manager count you might get good results.
>>>>>
>>>>> Regards
>>>>> Bhaskar
>>>>>
>>>>> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
>>>>> rohan.thimmappa@gmail.com> wrote:
>>>>>
>>>>>> Ravi, have you looked at the io operation(iops) rate of the disk? You
>>>>>> can monitoring the iops performance and tune it accordingly with your work
>>>>>> load. This helped us in our project when we hit the wall tuning prototype
>>>>>> much all the parameters.
>>>>>>
>>>>>> Rohan
>>>>>>
>>>>>>
>>>>>> ------------------------------
>>>>>> *From:* Ravi Bhushan Ratnakar <ra...@gmail.com>
>>>>>> *Sent:* Saturday, September 7, 2019 5:38 PM
>>>>>> *To:* Rafi Aroch
>>>>>> *Cc:* user
>>>>>> *Subject:* Re: Checkpointing is not performing well
>>>>>>
>>>>>> Hi Rafi,
>>>>>>
>>>>>> Thank you for your quick response.
>>>>>>
>>>>>> I have tested with rocksdb state backend. Rocksdb required
>>>>>> significantly more taskmanager to perform as compare to filesystem state
>>>>>> backend. The problem here is that checkpoint process is not fast enough to
>>>>>> complete.
>>>>>>
>>>>>> Our requirement is to do checkout as soon as possible like in 5
>>>>>> seconds to flush the output to output sink. As the incoming data rate is
>>>>>> high, it is not able to complete quickly. If I increase the checkpoint
>>>>>> duration, the state size grows much faster and hence takes much longer time
>>>>>> to complete checkpointing. I also tried to use AT LEAST ONCE mode, but does
>>>>>> not improve much. Adding more taskmanager to increase parallelism also does
>>>>>> not improve the checkpointing performance.
>>>>>>
>>>>>> Is it possible to achieve checkpointing as short as 5 seconds with
>>>>>> such high input volume?
>>>>>>
>>>>>> Regards,
>>>>>> Ravi
>>>>>>
>>>>>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Ravi,
>>>>>>>
>>>>>>> Consider moving to RocksDB state backend, where you can enable
>>>>>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>>>>>> much constant even when your state becomes larger.
>>>>>>>
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Rafi
>>>>>>>
>>>>>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>>>>>> ravibhushanratnakar@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am writing a streaming application using Flink 1.9. This
>>>>>>>> application consumes data from kinesis stream which is basically avro
>>>>>>>> payload. Application is using KeyedProcessFunction to execute business
>>>>>>>> logic on the basis of correlation id using event time characteristics with
>>>>>>>> below configuration --
>>>>>>>> StateBackend - filesystem with S3 storage
>>>>>>>> registerTimeTimer duration for each key is  -  currentWatermark  +
>>>>>>>> 15 seconds
>>>>>>>> checkpoint interval - 1min
>>>>>>>> minPauseBetweenCheckpointInterval - 1 min
>>>>>>>> checkpoint timeout - 10mins
>>>>>>>>
>>>>>>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>>>>>>
>>>>>>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>>>>>>
>>>>>>>> First 2-4 checkpoints get completed within 1mins where the state
>>>>>>>> size is usually 50GB. As the state size grows beyond 50GB, then
>>>>>>>> checkpointing time starts taking more than 1mins and it increased till 10
>>>>>>>> mins and then checkpoint fails. The moment the checkpoint starts taking
>>>>>>>> more than 1 mins to complete then application starts processing slow and
>>>>>>>> start lagging in output.
>>>>>>>>
>>>>>>>> Any suggestion to fine tune checkpoint performance would be highly
>>>>>>>> appreciated.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Ravi
>>>>>>>>
>>>>>>>

Re: Checkpointing is not performing well

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

There is no upper limit for state size in Flink. There are applications
with 10+ TB state.
However, it is natural that checkpointing time increases with state size as
more data needs to be serialized (in case of FSStateBackend) and written to
stable storage.
(The same is btw true for recovery when the state needs to be loaded back.)

There are a few tricks to reduce checkpointing time like using incremental
checkpoints which you tried already.
You can also scale out the application to use more machines and therefore
bandwidth + CPU (for serialization) during checkpoints.

Fabian

Am Mi., 11. Sept. 2019 um 09:38 Uhr schrieb Ravi Bhushan Ratnakar <
ravibhushanratnakar@gmail.com>:

> What is the upper limit of checkpoint size of Flink System?
>
> Regards,
> Ravi
>
> On Wed 11 Sep, 2019, 06:48 Vijay Bhaskar, <bh...@gmail.com>
> wrote:
>
>> You crossed  the upper limits of the check point system of Flink a way
>> high. Try to distribute events equally over time by adding some sort of
>> controlled back pressure after receiving data from kinesis streams.
>> Otherwise the spike coming during 5 seconds time would always create
>> problems. Tomorrow it may double so best solution in your case is to
>> deliver at configurable constant rate after receiving messages from kinesis
>> streams. Otherwise i am sure its always the problem whatever the kind of
>> streaming engine you use. Tune your configuration to get the optimal rate
>> so that flink checkpoint state is healthier.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
>> ravibhushanratnakar@gmail.com> wrote:
>>
>>> @Rohan - I am streaming data to kafka sink after applying business
>>> logic. For checkpoint, I am using s3 as a distributed file system. For
>>> local recovery, I am using Optimized iops ebs volume.
>>>
>>> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB
>>> per minute compressed(lz4) avro message. Generally 90% correlated events
>>> come within 5 seconds and 10% of the correlated events get extended to 65
>>> minute. Due to this business requirement, the state size keep growing till
>>> 65 minutes, after that the state size becomes more or less stable. As the
>>> state size is growing and is around 350gb at peak load, checkpoint is not
>>> able to complete within 1 minutes. I want to check as quick as possible
>>> like every 5 second.
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, <bh...@gmail.com>
>>> wrote:
>>>
>>>> For me task count seems to be huge in number with the mentioned
>>>> resource count. To rule out the possibility of issue with state backend can
>>>> you start writing sink data as <NO-Operation> , i.e., data ignore sink. And
>>>> try whether you could run it for longer duration without any issue. You can
>>>> start decreasing the task manager count until you find descent count of it
>>>> without having any side effects. Use that value as task manager count and
>>>> then start adding your state backend. First you can try with Rocks DB. With
>>>> reduced task manager count you might get good results.
>>>>
>>>> Regards
>>>> Bhaskar
>>>>
>>>> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
>>>> rohan.thimmappa@gmail.com> wrote:
>>>>
>>>>> Ravi, have you looked at the io operation(iops) rate of the disk? You
>>>>> can monitoring the iops performance and tune it accordingly with your work
>>>>> load. This helped us in our project when we hit the wall tuning prototype
>>>>> much all the parameters.
>>>>>
>>>>> Rohan
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> *From:* Ravi Bhushan Ratnakar <ra...@gmail.com>
>>>>> *Sent:* Saturday, September 7, 2019 5:38 PM
>>>>> *To:* Rafi Aroch
>>>>> *Cc:* user
>>>>> *Subject:* Re: Checkpointing is not performing well
>>>>>
>>>>> Hi Rafi,
>>>>>
>>>>> Thank you for your quick response.
>>>>>
>>>>> I have tested with rocksdb state backend. Rocksdb required
>>>>> significantly more taskmanager to perform as compare to filesystem state
>>>>> backend. The problem here is that checkpoint process is not fast enough to
>>>>> complete.
>>>>>
>>>>> Our requirement is to do checkout as soon as possible like in 5
>>>>> seconds to flush the output to output sink. As the incoming data rate is
>>>>> high, it is not able to complete quickly. If I increase the checkpoint
>>>>> duration, the state size grows much faster and hence takes much longer time
>>>>> to complete checkpointing. I also tried to use AT LEAST ONCE mode, but does
>>>>> not improve much. Adding more taskmanager to increase parallelism also does
>>>>> not improve the checkpointing performance.
>>>>>
>>>>> Is it possible to achieve checkpointing as short as 5 seconds with
>>>>> such high input volume?
>>>>>
>>>>> Regards,
>>>>> Ravi
>>>>>
>>>>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:
>>>>>
>>>>>> Hi Ravi,
>>>>>>
>>>>>> Consider moving to RocksDB state backend, where you can enable
>>>>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>>>>> much constant even when your state becomes larger.
>>>>>>
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Rafi
>>>>>>
>>>>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>>>>> ravibhushanratnakar@gmail.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am writing a streaming application using Flink 1.9. This
>>>>>>> application consumes data from kinesis stream which is basically avro
>>>>>>> payload. Application is using KeyedProcessFunction to execute business
>>>>>>> logic on the basis of correlation id using event time characteristics with
>>>>>>> below configuration --
>>>>>>> StateBackend - filesystem with S3 storage
>>>>>>> registerTimeTimer duration for each key is  -  currentWatermark  +
>>>>>>> 15 seconds
>>>>>>> checkpoint interval - 1min
>>>>>>> minPauseBetweenCheckpointInterval - 1 min
>>>>>>> checkpoint timeout - 10mins
>>>>>>>
>>>>>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>>>>>
>>>>>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>>>>>
>>>>>>> First 2-4 checkpoints get completed within 1mins where the state
>>>>>>> size is usually 50GB. As the state size grows beyond 50GB, then
>>>>>>> checkpointing time starts taking more than 1mins and it increased till 10
>>>>>>> mins and then checkpoint fails. The moment the checkpoint starts taking
>>>>>>> more than 1 mins to complete then application starts processing slow and
>>>>>>> start lagging in output.
>>>>>>>
>>>>>>> Any suggestion to fine tune checkpoint performance would be highly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ravi
>>>>>>>
>>>>>>

Re: Checkpointing is not performing well

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
What is the upper limit of checkpoint size of Flink System?

Regards,
Ravi

On Wed 11 Sep, 2019, 06:48 Vijay Bhaskar, <bh...@gmail.com> wrote:

> You crossed  the upper limits of the check point system of Flink a way
> high. Try to distribute events equally over time by adding some sort of
> controlled back pressure after receiving data from kinesis streams.
> Otherwise the spike coming during 5 seconds time would always create
> problems. Tomorrow it may double so best solution in your case is to
> deliver at configurable constant rate after receiving messages from kinesis
> streams. Otherwise i am sure its always the problem whatever the kind of
> streaming engine you use. Tune your configuration to get the optimal rate
> so that flink checkpoint state is healthier.
>
> Regards
> Bhaskar
>
> On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
> ravibhushanratnakar@gmail.com> wrote:
>
>> @Rohan - I am streaming data to kafka sink after applying business logic.
>> For checkpoint, I am using s3 as a distributed file system. For local
>> recovery, I am using Optimized iops ebs volume.
>>
>> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB
>> per minute compressed(lz4) avro message. Generally 90% correlated events
>> come within 5 seconds and 10% of the correlated events get extended to 65
>> minute. Due to this business requirement, the state size keep growing till
>> 65 minutes, after that the state size becomes more or less stable. As the
>> state size is growing and is around 350gb at peak load, checkpoint is not
>> able to complete within 1 minutes. I want to check as quick as possible
>> like every 5 second.
>>
>> Thanks,
>> Ravi
>>
>>
>> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, <bh...@gmail.com>
>> wrote:
>>
>>> For me task count seems to be huge in number with the mentioned resource
>>> count. To rule out the possibility of issue with state backend can you
>>> start writing sink data as <NO-Operation> , i.e., data ignore sink. And try
>>> whether you could run it for longer duration without any issue. You can
>>> start decreasing the task manager count until you find descent count of it
>>> without having any side effects. Use that value as task manager count and
>>> then start adding your state backend. First you can try with Rocks DB. With
>>> reduced task manager count you might get good results.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
>>> rohan.thimmappa@gmail.com> wrote:
>>>
>>>> Ravi, have you looked at the io operation(iops) rate of the disk? You
>>>> can monitoring the iops performance and tune it accordingly with your work
>>>> load. This helped us in our project when we hit the wall tuning prototype
>>>> much all the parameters.
>>>>
>>>> Rohan
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* Ravi Bhushan Ratnakar <ra...@gmail.com>
>>>> *Sent:* Saturday, September 7, 2019 5:38 PM
>>>> *To:* Rafi Aroch
>>>> *Cc:* user
>>>> *Subject:* Re: Checkpointing is not performing well
>>>>
>>>> Hi Rafi,
>>>>
>>>> Thank you for your quick response.
>>>>
>>>> I have tested with rocksdb state backend. Rocksdb required
>>>> significantly more taskmanager to perform as compare to filesystem state
>>>> backend. The problem here is that checkpoint process is not fast enough to
>>>> complete.
>>>>
>>>> Our requirement is to do checkout as soon as possible like in 5 seconds
>>>> to flush the output to output sink. As the incoming data rate is high, it
>>>> is not able to complete quickly. If I increase the checkpoint duration, the
>>>> state size grows much faster and hence takes much longer time to complete
>>>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>>>> much. Adding more taskmanager to increase parallelism also does not improve
>>>> the checkpointing performance.
>>>>
>>>> Is it possible to achieve checkpointing as short as 5 seconds with such
>>>> high input volume?
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:
>>>>
>>>>> Hi Ravi,
>>>>>
>>>>> Consider moving to RocksDB state backend, where you can enable
>>>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>>>> much constant even when your state becomes larger.
>>>>>
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Rafi
>>>>>
>>>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>>>> ravibhushanratnakar@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am writing a streaming application using Flink 1.9. This
>>>>>> application consumes data from kinesis stream which is basically avro
>>>>>> payload. Application is using KeyedProcessFunction to execute business
>>>>>> logic on the basis of correlation id using event time characteristics with
>>>>>> below configuration --
>>>>>> StateBackend - filesystem with S3 storage
>>>>>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>>>>>> seconds
>>>>>> checkpoint interval - 1min
>>>>>> minPauseBetweenCheckpointInterval - 1 min
>>>>>> checkpoint timeout - 10mins
>>>>>>
>>>>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>>>>
>>>>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>>>>
>>>>>> First 2-4 checkpoints get completed within 1mins where the state size
>>>>>> is usually 50GB. As the state size grows beyond 50GB, then checkpointing
>>>>>> time starts taking more than 1mins and it increased till 10 mins and then
>>>>>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>>>>>> to complete then application starts processing slow and start lagging in
>>>>>> output.
>>>>>>
>>>>>> Any suggestion to fine tune checkpoint performance would be highly
>>>>>> appreciated.
>>>>>>
>>>>>> Regards,
>>>>>> Ravi
>>>>>>
>>>>>

Re: Checkpointing is not performing well

Posted by Vijay Bhaskar <bh...@gmail.com>.
You crossed  the upper limits of the check point system of Flink a way
high. Try to distribute events equally over time by adding some sort of
controlled back pressure after receiving data from kinesis streams.
Otherwise the spike coming during 5 seconds time would always create
problems. Tomorrow it may double so best solution in your case is to
deliver at configurable constant rate after receiving messages from kinesis
streams. Otherwise i am sure its always the problem whatever the kind of
streaming engine you use. Tune your configuration to get the optimal rate
so that flink checkpoint state is healthier.

Regards
Bhaskar

On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
ravibhushanratnakar@gmail.com> wrote:

> @Rohan - I am streaming data to kafka sink after applying business logic.
> For checkpoint, I am using s3 as a distributed file system. For local
> recovery, I am using Optimized iops ebs volume.
>
> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB per
> minute compressed(lz4) avro message. Generally 90% correlated events come
> within 5 seconds and 10% of the correlated events get extended to 65
> minute. Due to this business requirement, the state size keep growing till
> 65 minutes, after that the state size becomes more or less stable. As the
> state size is growing and is around 350gb at peak load, checkpoint is not
> able to complete within 1 minutes. I want to check as quick as possible
> like every 5 second.
>
> Thanks,
> Ravi
>
>
> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, <bh...@gmail.com>
> wrote:
>
>> For me task count seems to be huge in number with the mentioned resource
>> count. To rule out the possibility of issue with state backend can you
>> start writing sink data as <NO-Operation> , i.e., data ignore sink. And try
>> whether you could run it for longer duration without any issue. You can
>> start decreasing the task manager count until you find descent count of it
>> without having any side effects. Use that value as task manager count and
>> then start adding your state backend. First you can try with Rocks DB. With
>> reduced task manager count you might get good results.
>>
>> Regards
>> Bhaskar
>>
>> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
>> rohan.thimmappa@gmail.com> wrote:
>>
>>> Ravi, have you looked at the io operation(iops) rate of the disk? You
>>> can monitoring the iops performance and tune it accordingly with your work
>>> load. This helped us in our project when we hit the wall tuning prototype
>>> much all the parameters.
>>>
>>> Rohan
>>>
>>>
>>> ------------------------------
>>> *From:* Ravi Bhushan Ratnakar <ra...@gmail.com>
>>> *Sent:* Saturday, September 7, 2019 5:38 PM
>>> *To:* Rafi Aroch
>>> *Cc:* user
>>> *Subject:* Re: Checkpointing is not performing well
>>>
>>> Hi Rafi,
>>>
>>> Thank you for your quick response.
>>>
>>> I have tested with rocksdb state backend. Rocksdb required significantly
>>> more taskmanager to perform as compare to filesystem state backend. The
>>> problem here is that checkpoint process is not fast enough to complete.
>>>
>>> Our requirement is to do checkout as soon as possible like in 5 seconds
>>> to flush the output to output sink. As the incoming data rate is high, it
>>> is not able to complete quickly. If I increase the checkpoint duration, the
>>> state size grows much faster and hence takes much longer time to complete
>>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>>> much. Adding more taskmanager to increase parallelism also does not improve
>>> the checkpointing performance.
>>>
>>> Is it possible to achieve checkpointing as short as 5 seconds with such
>>> high input volume?
>>>
>>> Regards,
>>> Ravi
>>>
>>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> Consider moving to RocksDB state backend, where you can enable
>>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>>> much constant even when your state becomes larger.
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>>
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>>> ravibhushanratnakar@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am writing a streaming application using Flink 1.9. This application
>>>>> consumes data from kinesis stream which is basically avro payload.
>>>>> Application is using KeyedProcessFunction to execute business logic on the
>>>>> basis of correlation id using event time characteristics with below
>>>>> configuration --
>>>>> StateBackend - filesystem with S3 storage
>>>>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>>>>> seconds
>>>>> checkpoint interval - 1min
>>>>> minPauseBetweenCheckpointInterval - 1 min
>>>>> checkpoint timeout - 10mins
>>>>>
>>>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>>>
>>>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>>>
>>>>> First 2-4 checkpoints get completed within 1mins where the state size
>>>>> is usually 50GB. As the state size grows beyond 50GB, then checkpointing
>>>>> time starts taking more than 1mins and it increased till 10 mins and then
>>>>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>>>>> to complete then application starts processing slow and start lagging in
>>>>> output.
>>>>>
>>>>> Any suggestion to fine tune checkpoint performance would be highly
>>>>> appreciated.
>>>>>
>>>>> Regards,
>>>>> Ravi
>>>>>
>>>>

Re: Checkpointing is not performing well

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
@Rohan - I am streaming data to kafka sink after applying business logic.
For checkpoint, I am using s3 as a distributed file system. For local
recovery, I am using Optimized iops ebs volume.

@Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB per
minute compressed(lz4) avro message. Generally 90% correlated events come
within 5 seconds and 10% of the correlated events get extended to 65
minute. Due to this business requirement, the state size keep growing till
65 minutes, after that the state size becomes more or less stable. As the
state size is growing and is around 350gb at peak load, checkpoint is not
able to complete within 1 minutes. I want to check as quick as possible
like every 5 second.

Thanks,
Ravi


On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, <bh...@gmail.com> wrote:

> For me task count seems to be huge in number with the mentioned resource
> count. To rule out the possibility of issue with state backend can you
> start writing sink data as <NO-Operation> , i.e., data ignore sink. And try
> whether you could run it for longer duration without any issue. You can
> start decreasing the task manager count until you find descent count of it
> without having any side effects. Use that value as task manager count and
> then start adding your state backend. First you can try with Rocks DB. With
> reduced task manager count you might get good results.
>
> Regards
> Bhaskar
>
> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <ro...@gmail.com>
> wrote:
>
>> Ravi, have you looked at the io operation(iops) rate of the disk? You can
>> monitoring the iops performance and tune it accordingly with your work
>> load. This helped us in our project when we hit the wall tuning prototype
>> much all the parameters.
>>
>> Rohan
>>
>>
>> ------------------------------
>> *From:* Ravi Bhushan Ratnakar <ra...@gmail.com>
>> *Sent:* Saturday, September 7, 2019 5:38 PM
>> *To:* Rafi Aroch
>> *Cc:* user
>> *Subject:* Re: Checkpointing is not performing well
>>
>> Hi Rafi,
>>
>> Thank you for your quick response.
>>
>> I have tested with rocksdb state backend. Rocksdb required significantly
>> more taskmanager to perform as compare to filesystem state backend. The
>> problem here is that checkpoint process is not fast enough to complete.
>>
>> Our requirement is to do checkout as soon as possible like in 5 seconds
>> to flush the output to output sink. As the incoming data rate is high, it
>> is not able to complete quickly. If I increase the checkpoint duration, the
>> state size grows much faster and hence takes much longer time to complete
>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>> much. Adding more taskmanager to increase parallelism also does not improve
>> the checkpointing performance.
>>
>> Is it possible to achieve checkpointing as short as 5 seconds with such
>> high input volume?
>>
>> Regards,
>> Ravi
>>
>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:
>>
>>> Hi Ravi,
>>>
>>> Consider moving to RocksDB state backend, where you can enable
>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>> much constant even when your state becomes larger.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>
>>>
>>> Thanks,
>>> Rafi
>>>
>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>> ravibhushanratnakar@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am writing a streaming application using Flink 1.9. This application
>>>> consumes data from kinesis stream which is basically avro payload.
>>>> Application is using KeyedProcessFunction to execute business logic on the
>>>> basis of correlation id using event time characteristics with below
>>>> configuration --
>>>> StateBackend - filesystem with S3 storage
>>>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>>>> seconds
>>>> checkpoint interval - 1min
>>>> minPauseBetweenCheckpointInterval - 1 min
>>>> checkpoint timeout - 10mins
>>>>
>>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>>
>>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>>
>>>> First 2-4 checkpoints get completed within 1mins where the state size
>>>> is usually 50GB. As the state size grows beyond 50GB, then checkpointing
>>>> time starts taking more than 1mins and it increased till 10 mins and then
>>>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>>>> to complete then application starts processing slow and start lagging in
>>>> output.
>>>>
>>>> Any suggestion to fine tune checkpoint performance would be highly
>>>> appreciated.
>>>>
>>>> Regards,
>>>> Ravi
>>>>
>>>

Re: Checkpointing is not performing well

Posted by Vijay Bhaskar <bh...@gmail.com>.
For me task count seems to be huge in number with the mentioned resource
count. To rule out the possibility of issue with state backend can you
start writing sink data as <NO-Operation> , i.e., data ignore sink. And try
whether you could run it for longer duration without any issue. You can
start decreasing the task manager count until you find descent count of it
without having any side effects. Use that value as task manager count and
then start adding your state backend. First you can try with Rocks DB. With
reduced task manager count you might get good results.

Regards
Bhaskar

On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <ro...@gmail.com>
wrote:

> Ravi, have you looked at the io operation(iops) rate of the disk? You can
> monitoring the iops performance and tune it accordingly with your work
> load. This helped us in our project when we hit the wall tuning prototype
> much all the parameters.
>
> Rohan
>
>
> ------------------------------
> *From:* Ravi Bhushan Ratnakar <ra...@gmail.com>
> *Sent:* Saturday, September 7, 2019 5:38 PM
> *To:* Rafi Aroch
> *Cc:* user
> *Subject:* Re: Checkpointing is not performing well
>
> Hi Rafi,
>
> Thank you for your quick response.
>
> I have tested with rocksdb state backend. Rocksdb required significantly
> more taskmanager to perform as compare to filesystem state backend. The
> problem here is that checkpoint process is not fast enough to complete.
>
> Our requirement is to do checkout as soon as possible like in 5 seconds to
> flush the output to output sink. As the incoming data rate is high, it is
> not able to complete quickly. If I increase the checkpoint duration, the
> state size grows much faster and hence takes much longer time to complete
> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
> much. Adding more taskmanager to increase parallelism also does not improve
> the checkpointing performance.
>
> Is it possible to achieve checkpointing as short as 5 seconds with such
> high input volume?
>
> Regards,
> Ravi
>
> On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:
>
>> Hi Ravi,
>>
>> Consider moving to RocksDB state backend, where you can enable
>> incremental checkpointing. This will make you checkpoints size stay pretty
>> much constant even when your state becomes larger.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>
>>
>> Thanks,
>> Rafi
>>
>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>> ravibhushanratnakar@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am writing a streaming application using Flink 1.9. This application
>>> consumes data from kinesis stream which is basically avro payload.
>>> Application is using KeyedProcessFunction to execute business logic on the
>>> basis of correlation id using event time characteristics with below
>>> configuration --
>>> StateBackend - filesystem with S3 storage
>>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>>> seconds
>>> checkpoint interval - 1min
>>> minPauseBetweenCheckpointInterval - 1 min
>>> checkpoint timeout - 10mins
>>>
>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>
>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>
>>> First 2-4 checkpoints get completed within 1mins where the state size is
>>> usually 50GB. As the state size grows beyond 50GB, then checkpointing time
>>> starts taking more than 1mins and it increased till 10 mins and then
>>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>>> to complete then application starts processing slow and start lagging in
>>> output.
>>>
>>> Any suggestion to fine tune checkpoint performance would be highly
>>> appreciated.
>>>
>>> Regards,
>>> Ravi
>>>
>>

Re: Checkpointing is not performing well

Posted by Rohan Thimmappa <ro...@gmail.com>.
Ravi, have you looked at the io operation(iops) rate of the disk? You can monitoring the iops performance and tune it accordingly with your work load. This helped us in our project when we hit the wall tuning prototype much all the parameters.

Rohan


________________________________
From: Ravi Bhushan Ratnakar <ra...@gmail.com>
Sent: Saturday, September 7, 2019 5:38 PM
To: Rafi Aroch
Cc: user
Subject: Re: Checkpointing is not performing well

Hi Rafi,

Thank you for your quick response.

I have tested with rocksdb state backend. Rocksdb required significantly more taskmanager to perform as compare to filesystem state backend. The problem here is that checkpoint process is not fast enough to complete.

Our requirement is to do checkout as soon as possible like in 5 seconds to flush the output to output sink. As the incoming data rate is high, it is not able to complete quickly. If I increase the checkpoint duration, the state size grows much faster and hence takes much longer time to complete checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve much. Adding more taskmanager to increase parallelism also does not improve the checkpointing performance.

Is it possible to achieve checkpointing as short as 5 seconds with such high input volume?

Regards,
Ravi

On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com>> wrote:
Hi Ravi,

Consider moving to RocksDB state backend, where you can enable incremental checkpointing. This will make you checkpoints size stay pretty much constant even when your state becomes larger.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend


Thanks,
Rafi

On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <ra...@gmail.com>> wrote:
Hi All,

I am writing a streaming application using Flink 1.9. This application consumes data from kinesis stream which is basically avro payload. Application is using KeyedProcessFunction to execute business logic on the basis of correlation id using event time characteristics with below configuration --
StateBackend - filesystem with S3 storage
registerTimeTimer duration for each key is  -  currentWatermark  + 15 seconds
checkpoint interval - 1min
minPauseBetweenCheckpointInterval - 1 min
checkpoint timeout - 10mins

incoming data rate from kinesis -  ~10 to 21GB/min

Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)

First 2-4 checkpoints get completed within 1mins where the state size is usually 50GB. As the state size grows beyond 50GB, then checkpointing time starts taking more than 1mins and it increased till 10 mins and then checkpoint fails. The moment the checkpoint starts taking more than 1 mins to complete then application starts processing slow and start lagging in output.

Any suggestion to fine tune checkpoint performance would be highly appreciated.

Regards,
Ravi

Re: Checkpointing is not performing well

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
Hi Rafi,

Thank you for your quick response.

I have tested with rocksdb state backend. Rocksdb required significantly
more taskmanager to perform as compare to filesystem state backend. The
problem here is that checkpoint process is not fast enough to complete.

Our requirement is to do checkout as soon as possible like in 5 seconds to
flush the output to output sink. As the incoming data rate is high, it is
not able to complete quickly. If I increase the checkpoint duration, the
state size grows much faster and hence takes much longer time to complete
checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
much. Adding more taskmanager to increase parallelism also does not improve
the checkpointing performance.

Is it possible to achieve checkpointing as short as 5 seconds with such
high input volume?

Regards,
Ravi

On Sat 7 Sep, 2019, 22:25 Rafi Aroch, <ra...@gmail.com> wrote:

> Hi Ravi,
>
> Consider moving to RocksDB state backend, where you can enable incremental
> checkpointing. This will make you checkpoints size stay pretty much
> constant even when your state becomes larger.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>
>
> Thanks,
> Rafi
>
> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
> ravibhushanratnakar@gmail.com> wrote:
>
>> Hi All,
>>
>> I am writing a streaming application using Flink 1.9. This application
>> consumes data from kinesis stream which is basically avro payload.
>> Application is using KeyedProcessFunction to execute business logic on the
>> basis of correlation id using event time characteristics with below
>> configuration --
>> StateBackend - filesystem with S3 storage
>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>> seconds
>> checkpoint interval - 1min
>> minPauseBetweenCheckpointInterval - 1 min
>> checkpoint timeout - 10mins
>>
>> incoming data rate from kinesis -  ~10 to 21GB/min
>>
>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>
>> First 2-4 checkpoints get completed within 1mins where the state size is
>> usually 50GB. As the state size grows beyond 50GB, then checkpointing time
>> starts taking more than 1mins and it increased till 10 mins and then
>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>> to complete then application starts processing slow and start lagging in
>> output.
>>
>> Any suggestion to fine tune checkpoint performance would be highly
>> appreciated.
>>
>> Regards,
>> Ravi
>>
>

Re: Checkpointing is not performing well

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Ravi,

Consider moving to RocksDB state backend, where you can enable incremental
checkpointing. This will make you checkpoints size stay pretty much
constant even when your state becomes larger.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend


Thanks,
Rafi

On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
ravibhushanratnakar@gmail.com> wrote:

> Hi All,
>
> I am writing a streaming application using Flink 1.9. This application
> consumes data from kinesis stream which is basically avro payload.
> Application is using KeyedProcessFunction to execute business logic on the
> basis of correlation id using event time characteristics with below
> configuration --
> StateBackend - filesystem with S3 storage
> registerTimeTimer duration for each key is  -  currentWatermark  + 15
> seconds
> checkpoint interval - 1min
> minPauseBetweenCheckpointInterval - 1 min
> checkpoint timeout - 10mins
>
> incoming data rate from kinesis -  ~10 to 21GB/min
>
> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>
> First 2-4 checkpoints get completed within 1mins where the state size is
> usually 50GB. As the state size grows beyond 50GB, then checkpointing time
> starts taking more than 1mins and it increased till 10 mins and then
> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
> to complete then application starts processing slow and start lagging in
> output.
>
> Any suggestion to fine tune checkpoint performance would be highly
> appreciated.
>
> Regards,
> Ravi
>