You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Boris Lublinsky <bo...@lightbend.com> on 2019/06/03 22:45:24 UTC

Controlling the amount of checkpoint files

Is there a way to limit the amount of checkpoint file?
The parameter that I set : state.checkpoints.num-retained: 5
does not seem to have any effect. Is there anything else I can set to prevent infinite growth of checkpointing info?

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/


Re: Controlling the amount of checkpoint files

Posted by Boris Lublinsky <bo...@lightbend.com>.
So if you have externalized checkpoints, they are never purged?
The issue is that if your state size is rather large, this seems to be the only option.

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Jun 15, 2019, at 3:39 AM, Robert Metzger <rm...@apache.org> wrote:
> 
> Hey Boris,
> 
> I think the problem is that you are using externalized checkpoints:
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> Your checkpoints are retained in both failure and cancellation cases, so the checkpoint files with grow indefinitely
> 
> 
> 
> On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu <qcx978132955@gmail.com <ma...@gmail.com>> wrote:
> Hi Boris
> For the configure you gave, you can try to reduce the parallelism of the operator which contains states.
> 
> Best,
> Congxian
> 
> 
> Boris Lublinsky <boris.lublinsky@lightbend.com <ma...@lightbend.com>> 于2019年6月10日周一 下午9:43写道:
> Here is code enabling checkpointing
> 
> // Enable checkpointing
> env.enableCheckpointing(60000 )   // 1 min
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints <>", true)
> env.setStateBackend(checkpointingBackend)
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <ma...@lightbend.com>
> https://www.lightbend.com/ <https://www.lightbend.com/>
>> On Jun 10, 2019, at 1:07 AM, Congxian Qiu <qcx978132955@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi
>> 
>> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or increment) do you use? 
>> 
>> Best,
>> Congxian
>> 
>> 
>> Boris Lublinsky <boris.lublinsky@lightbend.com <ma...@lightbend.com>> 于2019年6月4日周二 上午6:45写道:
>> Is there a way to limit the amount of checkpoint file?
>> The parameter that I set : state.checkpoints.num-retained: 5
>> does not seem to have any effect. Is there anything else I can set to prevent infinite growth of checkpointing info?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <ma...@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
> 


Re: Controlling the amount of checkpoint files

Posted by Robert Metzger <rm...@apache.org>.
Hey Boris,

I think the problem is that you are using externalized checkpoints:

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

Your checkpoints are retained in both failure and cancellation cases, so
the checkpoint files with grow indefinitely



On Wed, Jun 12, 2019 at 8:01 AM Congxian Qiu <qc...@gmail.com> wrote:

> Hi Boris
> For the configure you gave, you can try to reduce the parallelism of the
> operator which contains states.
>
> Best,
> Congxian
>
>
> Boris Lublinsky <bo...@lightbend.com> 于2019年6月10日周一 下午9:43写道:
>
>> Here is code enabling checkpointing
>>
>> // Enable checkpointing
>> env.enableCheckpointing(60000 )   // 1 min
>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", true)
>> env.setStateBackend(checkpointingBackend)
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com
>> https://www.lightbend.com/
>>
>> On Jun 10, 2019, at 1:07 AM, Congxian Qiu <qc...@gmail.com> wrote:
>>
>> Hi
>>
>> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or
>> increment) do you use?
>>
>> Best,
>> Congxian
>>
>>
>> Boris Lublinsky <bo...@lightbend.com> 于2019年6月4日周二 上午6:45写道:
>>
>>> Is there a way to limit the amount of checkpoint file?
>>> The parameter that I set : state.checkpoints.num-retained: 5
>>> does not seem to have any effect. Is there anything else I can set to
>>> prevent infinite growth of checkpointing info?
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com
>>> https://www.lightbend.com/
>>>
>>>
>>

Re: Controlling the amount of checkpoint files

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Boris
For the configure you gave, you can try to reduce the parallelism of the
operator which contains states.

Best,
Congxian


Boris Lublinsky <bo...@lightbend.com> 于2019年6月10日周一 下午9:43写道:

> Here is code enabling checkpointing
>
> // Enable checkpointing
> env.enableCheckpointing(60000 )   // 1 min
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> val checkpointingBackend = new FsStateBackend("file:///flink/checkpoints", true)
> env.setStateBackend(checkpointingBackend)
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com
> https://www.lightbend.com/
>
> On Jun 10, 2019, at 1:07 AM, Congxian Qiu <qc...@gmail.com> wrote:
>
> Hi
>
> Which state backed(Heap or RocksDB) and checkpoint mode (fullsnapshot or
> increment) do you use?
>
> Best,
> Congxian
>
>
> Boris Lublinsky <bo...@lightbend.com> 于2019年6月4日周二 上午6:45写道:
>
>> Is there a way to limit the amount of checkpoint file?
>> The parameter that I set : state.checkpoints.num-retained: 5
>> does not seem to have any effect. Is there anything else I can set to
>> prevent infinite growth of checkpointing info?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com
>> https://www.lightbend.com/
>>
>>
>