You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Zach Cox <zc...@gmail.com> on 2016/04/05 20:39:21 UTC

Checkpoint state stored in backend, and deleting old checkpoint state

Hi - I have some questions regarding Flink's checkpointing, specifically
related to storing state in the backends.

So let's say an operator in a streaming job is building up some state. When
it receives barriers from all of its input streams, does it store *all* of
its state to the backend? I think that is what the docs [1] and paper [2]
imply, but want to make sure. In other words, if the operator contains
100MB of state, and the backend is HDFS, does the operator copy all 100MB
of state to HDFS during the checkpoint?

Following on this example, say the operator is a global window and is
storing some state for each unique key observed in the stream of messages
(e.g. userId). Assume that over time, the number of observed unique keys
grows, so the size of the state also grows (the window state is never
purged). Is the entire operator state at the time of each checkpoint stored
to the backend? So that over time, the size of the state stored for each
checkpoint to the backend grows? Or is the state stored to the backend
somehow just the state that changed in some way since the last checkpoint?

Are old checkpoint states in the backend ever deleted / cleaned up? That
is, if all of the state for checkpoint n in the backend is all that is
needed to restore a failed job, then all state for all checkpoints m < n
should not be needed any more, right? Can all of those old checkpoints be
deleted from the backend? Does Flink do this?

Thanks,
Zach

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
[2] http://arxiv.org/abs/1506.08603

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Posted by Zach Cox <zc...@gmail.com>.
Hi Stephan - incremental checkpointing sounds really interesting and
useful, I look forward to trying it out.

Thanks,
Zach


On Wed, Apr 6, 2016 at 4:39 AM Stephan Ewen <se...@apache.org> wrote:

> Hi Zach!
>
> I am working on incremental checkpointing, hope to have it in the master
> in the next weeks.
>
> The current approach is a to have a full self-contained checkpoint every
> once in a while, and have incremental checkpoints most of the time. Having
> a full checkpoint every now and then spares you from re-applying an endless
> set of deltas on recovery.
>
> Related to that is also making the checkpointing asynchronous, so that
> normal operations do not see any disruption any more.
>
> Greetings,
> Stephan
>
> On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <zc...@gmail.com> wrote:
>
>> Thanks for the details Konstantin and Ufuk!
>>
>>
>> On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <
>> konstantin.knauf@tngtech.com> wrote:
>>
>>> Hi Ufuk,
>>>
>>> I thought so, but I am not sure when and where ;) I will let you know,
>>> if I come across it again.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On 05.04.2016 21:10, Ufuk Celebi wrote:
>>> > Hey Zach and Konstantin,
>>> >
>>> > Great questions and answers. We can try to make this more explicit in
>>> the docs.
>>> >
>>> > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
>>> > <ko...@tngtech.com> wrote:
>>> >> To my knowledge flink takes care of deleting old checkpoints (I think
>>> it
>>> >> says so in the documentation about savepoints.). In my experience
>>> >> though, if a job is cancelled or crashes, the checkpoint files are
>>> >> usually not cleaned up. So some housekeeping might be necessary.
>>> >
>>> > Regarding cleanup: currently only the latest successful checkpoint is
>>> retained.
>>> >
>>> > On graceful shutdown, all checkpoints should be cleaned up as far as I
>>> > know. Savepoints always have to be cleaned up manually.
>>> >
>>> > On crashes, the checkpoint state has to be cleaned up manually (if the
>>> > JVM shut down hooks did not run).
>>> >
>>> > @Konstantin: did you have lingering state without crashes?
>>> >
>>> > – Ufuk
>>> >
>>>
>>> --
>>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>
>

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Posted by Stephan Ewen <se...@apache.org>.
Hi Zach!

I am working on incremental checkpointing, hope to have it in the master in
the next weeks.

The current approach is a to have a full self-contained checkpoint every
once in a while, and have incremental checkpoints most of the time. Having
a full checkpoint every now and then spares you from re-applying an endless
set of deltas on recovery.

Related to that is also making the checkpointing asynchronous, so that
normal operations do not see any disruption any more.

Greetings,
Stephan

On Tue, Apr 5, 2016 at 10:25 PM, Zach Cox <zc...@gmail.com> wrote:

> Thanks for the details Konstantin and Ufuk!
>
>
> On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <
> konstantin.knauf@tngtech.com> wrote:
>
>> Hi Ufuk,
>>
>> I thought so, but I am not sure when and where ;) I will let you know,
>> if I come across it again.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On 05.04.2016 21:10, Ufuk Celebi wrote:
>> > Hey Zach and Konstantin,
>> >
>> > Great questions and answers. We can try to make this more explicit in
>> the docs.
>> >
>> > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
>> > <ko...@tngtech.com> wrote:
>> >> To my knowledge flink takes care of deleting old checkpoints (I think
>> it
>> >> says so in the documentation about savepoints.). In my experience
>> >> though, if a job is cancelled or crashes, the checkpoint files are
>> >> usually not cleaned up. So some housekeeping might be necessary.
>> >
>> > Regarding cleanup: currently only the latest successful checkpoint is
>> retained.
>> >
>> > On graceful shutdown, all checkpoints should be cleaned up as far as I
>> > know. Savepoints always have to be cleaned up manually.
>> >
>> > On crashes, the checkpoint state has to be cleaned up manually (if the
>> > JVM shut down hooks did not run).
>> >
>> > @Konstantin: did you have lingering state without crashes?
>> >
>> > – Ufuk
>> >
>>
>> --
>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Posted by Zach Cox <zc...@gmail.com>.
Thanks for the details Konstantin and Ufuk!


On Tue, Apr 5, 2016 at 2:39 PM Konstantin Knauf <
konstantin.knauf@tngtech.com> wrote:

> Hi Ufuk,
>
> I thought so, but I am not sure when and where ;) I will let you know,
> if I come across it again.
>
> Cheers,
>
> Konstantin
>
> On 05.04.2016 21:10, Ufuk Celebi wrote:
> > Hey Zach and Konstantin,
> >
> > Great questions and answers. We can try to make this more explicit in
> the docs.
> >
> > On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> > <ko...@tngtech.com> wrote:
> >> To my knowledge flink takes care of deleting old checkpoints (I think it
> >> says so in the documentation about savepoints.). In my experience
> >> though, if a job is cancelled or crashes, the checkpoint files are
> >> usually not cleaned up. So some housekeeping might be necessary.
> >
> > Regarding cleanup: currently only the latest successful checkpoint is
> retained.
> >
> > On graceful shutdown, all checkpoints should be cleaned up as far as I
> > know. Savepoints always have to be cleaned up manually.
> >
> > On crashes, the checkpoint state has to be cleaned up manually (if the
> > JVM shut down hooks did not run).
> >
> > @Konstantin: did you have lingering state without crashes?
> >
> > – Ufuk
> >
>
> --
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Posted by Konstantin Knauf <ko...@tngtech.com>.
Hi Ufuk,

I thought so, but I am not sure when and where ;) I will let you know,
if I come across it again.

Cheers,

Konstantin

On 05.04.2016 21:10, Ufuk Celebi wrote:
> Hey Zach and Konstantin,
> 
> Great questions and answers. We can try to make this more explicit in the docs.
> 
> On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
> <ko...@tngtech.com> wrote:
>> To my knowledge flink takes care of deleting old checkpoints (I think it
>> says so in the documentation about savepoints.). In my experience
>> though, if a job is cancelled or crashes, the checkpoint files are
>> usually not cleaned up. So some housekeeping might be necessary.
> 
> Regarding cleanup: currently only the latest successful checkpoint is retained.
> 
> On graceful shutdown, all checkpoints should be cleaned up as far as I
> know. Savepoints always have to be cleaned up manually.
> 
> On crashes, the checkpoint state has to be cleaned up manually (if the
> JVM shut down hooks did not run).
> 
> @Konstantin: did you have lingering state without crashes?
> 
> – Ufuk
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Zach and Konstantin,

Great questions and answers. We can try to make this more explicit in the docs.

On Tue, Apr 5, 2016 at 8:54 PM, Konstantin Knauf
<ko...@tngtech.com> wrote:
> To my knowledge flink takes care of deleting old checkpoints (I think it
> says so in the documentation about savepoints.). In my experience
> though, if a job is cancelled or crashes, the checkpoint files are
> usually not cleaned up. So some housekeeping might be necessary.

Regarding cleanup: currently only the latest successful checkpoint is retained.

On graceful shutdown, all checkpoints should be cleaned up as far as I
know. Savepoints always have to be cleaned up manually.

On crashes, the checkpoint state has to be cleaned up manually (if the
JVM shut down hooks did not run).

@Konstantin: did you have lingering state without crashes?

– Ufuk

Re: Checkpoint state stored in backend, and deleting old checkpoint state

Posted by Konstantin Knauf <ko...@tngtech.com>.
Hi Zach,

some answers/comments inline.

Cheers

Konstantin

On 05.04.2016 20:39, Zach Cox wrote:
> Hi - I have some questions regarding Flink's checkpointing, specifically
> related to storing state in the backends.
> 
> So let's say an operator in a streaming job is building up some state.
> When it receives barriers from all of its input streams, does it store
> *all* of its state to the backend? I think that is what the docs [1] and
> paper [2] imply, but want to make sure. In other words, if the operator
> contains 100MB of state, and the backend is HDFS, does the operator copy
> all 100MB of state to HDFS during the checkpoint?

Yes. With the filesystem backend this happens synchronously, with
RocksDB backend the transfer to HDFS is asynchronous.

> Following on this example, say the operator is a global window and is
> storing some state for each unique key observed in the stream of
> messages (e.g. userId). Assume that over time, the number of observed
> unique keys grows, so the size of the state also grows (the window state
> is never purged). Is the entire operator state at the time of each
> checkpoint stored to the backend? So that over time, the size of the
> state stored for each checkpoint to the backend grows? Or is the state
> stored to the backend somehow just the state that changed in some way
> since the last checkpoint?

The complete state is checkpointed. Incremental backups are currently
not supported, but seem to be on the roadmap.

> Are old checkpoint states in the backend ever deleted / cleaned up? That
> is, if all of the state for checkpoint n in the backend is all that is
> needed to restore a failed job, then all state for all checkpoints m < n
> should not be needed any more, right? Can all of those old checkpoints
> be deleted from the backend? Does Flink do this?

To my knowledge flink takes care of deleting old checkpoints (I think it
says so in the documentation about savepoints.). In my experience
though, if a job is cancelled or crashes, the checkpoint files are
usually not cleaned up. So some housekeeping might be necessary.

> Thanks,
> Zach
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
> [2] http://arxiv.org/abs/1506.08603
> 



-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082