You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijayendra Yadav <co...@gmail.com> on 2020/08/24 16:17:35 UTC

Flink OnCheckpointRollingPolicy streamingfilesink

Hi Team,

Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY)
on every checkpoint.

*.withRollingPolicy(OnCheckpointRollingPolicy.build())*

Question: What are recommended values related to checkpointing to fsstate,
should it be more frequent checkpoints, or longer intervals, how many
concurrent checkpoints needs to be allowed, how much should be an ideal
pause between each checkpoint.
Is there a way to control batch size here other than time ? any
recommendations to all the parameters listed below?
*Note: *I am trying to improve sink throughput.


env.enableCheckpointing(chckptintervalmilli)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)

Thanks,
Vijay

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Posted by Vijayendra Yadav <co...@gmail.com>.
Thank You Andrey.

Regards,
Vijay


> On Aug 29, 2020, at 3:38 AM, Andrey Zagrebin <az...@apache.org> wrote:
> 
> 
> Hi Vijay,
> 
> I would apply the same judgement. It is latency vs throughput vs spent resources vs practical need.
> 
> The more concurrent checkpoints your system is capable of handling, the better end-to-end result latency you will observe and see computation results more frequently.
> On the other hand your system needs to provide more resources (maybe higher parallelism) to process more current checkpoints.
> 
> Again lees the checkpoints -> more records are batched together and the throughput is better.
> 
> It usually does not make sense to have a big number of current checkpoints which process only a handful of records in between if you do not observe any practical decrease of latency.
> The system will just waste resources to process the checkpoints.
> 
> Best,
> Andrey
> 
>> On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav <co...@gmail.com> wrote:
>> Hi Andrey,
>> 
>> Thanks, 
>> what is recommendation for :  env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt) ?
>> 
>> 1 or higher based on what factor.
>> 
>> 
>> Regards,
>> Vijay
>> 
>> 
>>> On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <az...@apache.org> wrote:
>>> Hi Vijay,
>>> 
>>> I think it depends on your job requirements, in particular how many records are processed per second and how much resources you have to process them.
>>> 
>>> If the checkpointing interval is short then the checkpointing overhead can be too high and you need more resources to efficiently keep up with the incoming streaming.
>>> 
>>> If the checkpointing interval is long, more records are batched together and the throughput is better.
>>> On the other hand, the observed latency is lower because the batched results get flushed into the files and become visible in the external system only when checkpoint occurs to provide exactly once guarantee.
>>> 
>>> Best,
>>> Andrey
>>> 
>>>> On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <co...@gmail.com> wrote:
>>>> Hi Team,
>>>> 
>>>> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) on every checkpoint.  
>>>> 
>>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>> 
>>>> Question: What are recommended values related to checkpointing to fsstate, should it be more frequent checkpoints, or longer intervals, how many concurrent checkpoints needs to be allowed, how much should be an ideal pause between each checkpoint.
>>>> Is there a way to control batch size here other than time ? any recommendations to all the parameters listed below? 
>>>> Note: I am trying to improve sink throughput. 
>>>> 
>>>> 
>>>> env.enableCheckpointing(chckptintervalmilli)      env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
>>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
>>>> env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
>>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
>>>>  env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
>>>> 
>>>> Thanks,
>>>> Vijay

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Vijay,

I would apply the same judgement. It is latency vs throughput vs spent
resources vs practical need.

The more concurrent checkpoints your system is capable of handling, the
better end-to-end result latency you will observe and see computation
results more frequently.
On the other hand your system needs to provide more resources (maybe higher
parallelism) to process more current checkpoints.

Again lees the checkpoints -> more records are batched together and the
throughput is better.

It usually does not make sense to have a big number of current checkpoints
which process only a handful of records in between if you do not observe
any practical decrease of latency.
The system will just waste resources to process the checkpoints.

Best,
Andrey

On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav <co...@gmail.com>
wrote:

> Hi Andrey,
>
> Thanks,
> what is recommendation for :  env.getCheckpointConfig.
> *setMaxConcurrentCheckpoints*(concurrentchckpt) ?
>
> 1 or higher based on what factor.
>
>
> Regards,
> Vijay
>
>
> On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <az...@apache.org>
> wrote:
>
>> Hi Vijay,
>>
>> I think it depends on your job requirements, in particular how many
>> records are processed per second and how much resources you have to process
>> them.
>>
>> If the checkpointing interval is short then the checkpointing overhead
>> can be too high and you need more resources to efficiently keep up with the
>> incoming streaming.
>>
>> If the checkpointing interval is long, more records are batched together
>> and the throughput is better.
>> On the other hand, the observed latency is lower because the batched
>> results get flushed into the files and become visible in the external
>> system only when checkpoint occurs to provide exactly once guarantee.
>>
>> Best,
>> Andrey
>>
>> On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <co...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls
>>> (ONLY) on every checkpoint.
>>>
>>> *.withRollingPolicy(OnCheckpointRollingPolicy.build())*
>>>
>>> Question: What are recommended values related to checkpointing to
>>> fsstate, should it be more frequent checkpoints, or longer intervals, how
>>> many concurrent checkpoints needs to be allowed, how much should be an
>>> ideal pause between each checkpoint.
>>> Is there a way to control batch size here other than time ? any
>>> recommendations to all the parameters listed below?
>>> *Note: *I am trying to improve sink throughput.
>>>
>>>
>>> env.enableCheckpointing(chckptintervalmilli)
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
>>>
>>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
>>> env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
>>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
>>>
>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
>>>
>>>  env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
>>>
>>> Thanks,
>>> Vijay
>>>
>>

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Posted by Vijayendra Yadav <co...@gmail.com>.
Hi Andrey,

Thanks,
what is recommendation for :  env.getCheckpointConfig.
*setMaxConcurrentCheckpoints*(concurrentchckpt) ?

1 or higher based on what factor.


Regards,
Vijay


On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin <az...@apache.org>
wrote:

> Hi Vijay,
>
> I think it depends on your job requirements, in particular how many
> records are processed per second and how much resources you have to process
> them.
>
> If the checkpointing interval is short then the checkpointing overhead can
> be too high and you need more resources to efficiently keep up with the
> incoming streaming.
>
> If the checkpointing interval is long, more records are batched together
> and the throughput is better.
> On the other hand, the observed latency is lower because the batched
> results get flushed into the files and become visible in the external
> system only when checkpoint occurs to provide exactly once guarantee.
>
> Best,
> Andrey
>
> On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <co...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls
>> (ONLY) on every checkpoint.
>>
>> *.withRollingPolicy(OnCheckpointRollingPolicy.build())*
>>
>> Question: What are recommended values related to checkpointing to
>> fsstate, should it be more frequent checkpoints, or longer intervals, how
>> many concurrent checkpoints needs to be allowed, how much should be an
>> ideal pause between each checkpoint.
>> Is there a way to control batch size here other than time ? any
>> recommendations to all the parameters listed below?
>> *Note: *I am trying to improve sink throughput.
>>
>>
>> env.enableCheckpointing(chckptintervalmilli)
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
>>
>> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
>> env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
>>
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
>>
>>  env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
>>
>> Thanks,
>> Vijay
>>
>

Re: Flink OnCheckpointRollingPolicy streamingfilesink

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Vijay,

I think it depends on your job requirements, in particular how many records
are processed per second and how much resources you have to process them.

If the checkpointing interval is short then the checkpointing overhead can
be too high and you need more resources to efficiently keep up with the
incoming streaming.

If the checkpointing interval is long, more records are batched together
and the throughput is better.
On the other hand, the observed latency is lower because the batched
results get flushed into the files and become visible in the external
system only when checkpoint occurs to provide exactly once guarantee.

Best,
Andrey

On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav <co...@gmail.com>
wrote:

> Hi Team,
>
> Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY)
> on every checkpoint.
>
> *.withRollingPolicy(OnCheckpointRollingPolicy.build())*
>
> Question: What are recommended values related to checkpointing to fsstate,
> should it be more frequent checkpoints, or longer intervals, how many
> concurrent checkpoints needs to be allowed, how much should be an ideal
> pause between each checkpoint.
> Is there a way to control batch size here other than time ? any
> recommendations to all the parameters listed below?
> *Note: *I am trying to improve sink throughput.
>
>
> env.enableCheckpointing(chckptintervalmilli)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
>
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
> env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
>
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
>
>  env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
>
> Thanks,
> Vijay
>