You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Matthias Pohl <ma...@aiven.io.INVALID> on 2022/10/27 12:20:08 UTC

Re: [DISCUSS] Repeatable cleanup of checkpoint data

I would like to bring this topic up one more time. I put some more thought
into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
updated version of what I summarized in my previous email. It would be
interesting to get some additional perspectives on this; more specifically,
the two included proposals about either just repurposing the
CompletedCheckpointStore into a more generic CheckpointStore or refactoring
the StateHandleStore interface moving all the cleanup logic from the
CheckpointsCleaner and StateHandleStore into what's currently called
CompletedCheckpointStore.

Looking forward to feedback on that proposal.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore

On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl <ma...@aiven.io>
wrote:

> Hi everyone,
>
> I’d like to start a discussion on repeatable cleanup of checkpoint data.
> In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> introduction of the JobResultStore component. The goal was to make Flink
> being in charge of cleanup for the data it owns. The Flink cluster should
> only shutdown gracefully after all its artifacts are removed. That way, one
> would not miss abandoned artifacts accidentally.
>
> We forgot to cover one code path around cleaning up checkpoint data.
> Currently, in case of an error (e.g. permission issues), checkpoints are
> tried to be cleaned up in the CheckpointsCleaner and left like that if
> that cleanup failed. A log message is printed. The user would be
> responsible for cleaning up the data. This was discussed as part of the
> release testing efforts for Flink 1.15 in FLINK-26388 [2].
>
> We could add repeatable cleanup in the CheckpointsCleaner. We would have
> to make sure that all StateObject#discardState implementations are
> idempotent. This is not necessarily the case right now (see FLINK-26606
> [3]).
>
> Additionally, there is the problem of losing information about what
> Checkpoints are subject to cleanup in case of JobManager failovers. These
> Checkpoints are not stored as part of the HA data. Additionally,
> PendingCheckpoints are not serialized in any way, either. None of these
> artifacts are picked up again after a failover. I see the following options
> here:
>
>    -
>
>    The purpose of CompletedCheckpointStore needs to be extended to become
>    a “general” CheckpointStore. It will store PendingCheckpoints and
>    CompletedCheckpoints that are marked for deletion. After a failover,
>    CheckpointsCleaner can pick up these instances again and continue with
>    the deletion process.
>
> The flaw of that approach is that we’re increasing the amount of data that
> is stored in the underlying StateHandleStore. Additionally, we’re going
> to have an increased number of accesses to the CompletedCheckpointStore.
> These accesses need to happen in the main thread; more specifically, adding
> PendingCheckpoints and marking Checkpoints for deletion.
>
>    -
>
>    We’re actually interested in cleaning up artifacts from the
>    FileSystem, i.e. the artifacts created by the StateHandleStore used
>    within the DefaultCompletedCheckpointStore containing the serialized
>    CompletedCheckpoint instance and the checkpoint’s folder containing
>    the actual operator states. We could adapt the CompletedCheckpointStore
>    in a way that any Checkpoint (including PendingCheckpoint) is
>    serialized and persisted on the FileSystem right away (which is currently
>    done within the StateHandleStore implementations when adding
>    CompletedCheckpoints to the underlying HA system). The corresponding
>    FileStateHandleObject (referring to that serialized CompletedCheckpoint)
>    that gets persisted to ZooKeeper/k8s ConfigMap in the end would be only
>    written if the CompletedCheckpoint is finalized and can be used. The
>    CheckpointsCleaner could recover any artifacts from the FileSystem and
>    cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
>
> This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
> but would require additional IO on the FileSystem, still. It would require
> some larger refactoring, though: The RetrievableStateHandle currently being
> handled internally (i.e. in the StateHandleStore) need to become public.
>
>    -
>
>    We just add repeatable cleanup to the CheckpointsCleaner as is. No
>    cleanup is picked up again after recovery. This could be a fallback for the
>    user to reduce IO.
>
>
> I’m interested in initial opinions from the community on that matter
> before starting to work on a final FLIP.
>
> Best,
>
> Matthias
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> [2] https://issues.apache.org/jira/browse/FLINK-26388
>
> [3] https://issues.apache.org/jira/browse/FLINK-26606
>

Re: [DISCUSS] Repeatable cleanup of checkpoint data

Posted by Robert Metzger <rm...@apache.org>.
Yeah, at some point I've investigated performance issues with AWS K8s. They
have somewhat strict rate limits on the K8s api server.

You run into the rate limits by configuring a very high checkpoint
frequency (I guess something like 500ms) and a high
state.checkpoints.num-retained count (e.g. 10). On each checkpoint
completion (every 500ms), a list of 10 checkpoints is written to a
ConfigMap. IIRC the combination of frequent, large ConfigMap updates were
the factors that really killed it.
When you run into the rate limits, the K8s client in Flink is throwing an
error, causing loss of leadership.

I don't know enough about the internals of the K8s HA store, but for
solving the problem with a high number of retained checkpoints, I wonder if
it has any benefits to create a ConfigMap per retained checkpoint, instead
of a big ConfigMap, which contains a list of retained checkpoints?
Otherwise, I don't have any ideas at the moment how to mitigate this
problem.

I hope this helps.


On Thu, Nov 10, 2022 at 10:56 AM Matthias Pohl
<ma...@aiven.io.invalid> wrote:

> Thanks for sharing your opinions on the proposal. The concerns sound
> reasonable. I guess, I'm going to follow-up on Chesnay's idea about
> combining multiple requests into one for the k8s implementation. Having a
> performance test for the k8s API server access sounds like a good idea,
> too. Both action items are a prerequisite before continuing with FLIP-270
> [1].
>
> @Yang Wang: Do we have some Jira issue or ML discussion on the k8s API
> server performance issues? I couldn't come up with a good search query
> myself. :-D
>
> @Robert (CC'd): was it you who worked on the k8s API server overload issue
> in 1.15? Do you have some memory about it or some starting point with
> source code or something similar?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
>
> On Mon, Nov 7, 2022 at 12:59 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
> > This is a nice FLIP. I particular like how much background it provides
> > on the issue; something that other FLIPs could certainly benefit from...
> >
> > I went over the FLIP and had a chat with Matthias about it.
> >
> > Somewhat unrelated to the FLIP we found a flaw in the current cleanup
> > mechanism of failed checkpoints, where the JM deletes files while a TM
> > may still be in the process of writing checkpoint data. This is because
> > we never wait for an ack from the TMs that that have aborted the
> > checkpoint.
> > We additionally noted that when incremental checkpoints are enabled we
> > might be storing a large number of checkpoints in HA, without a
> > conclusion on what to do about it.
> >
> >
> > As for the FLIP itself, I'm concerned about proposal #2 because it
> > requires iterating over the entire checkpoint directory on /any/
> > failover to find checkpoints that can be deleted. This can be an
> > expensive operation for certain filesystems (S3), particularly when
> > incremental checkpoints are being used.
> > In the interest of fast failovers we ideally don't use mechanisms that
> > scale with.../anything/, really.
> >
> > However, storing more data in HA is also concerning, as Yang Wang
> > pointed out.
> > To not increase the number of requests made against HA we could maybe
> > consider looking into piggy-backing delete operations on other HA
> > operations, like the checkpoint counter increments.
> >
> > On that note, do we have any benchmarks for HA? I remember we looked
> > into that for...1.15 I believe at some point. With HA load being such a
> > major concern for this FLIP it would be good to have _something_ to
> > measure that.
> >
> > On 27/10/2022 14:20, Matthias Pohl wrote:
> > > I would like to bring this topic up one more time. I put some more
> > thought
> > > into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
> > > updated version of what I summarized in my previous email. It would be
> > > interesting to get some additional perspectives on this; more
> > specifically,
> > > the two included proposals about either just repurposing the
> > > CompletedCheckpointStore into a more generic CheckpointStore or
> > refactoring
> > > the StateHandleStore interface moving all the cleanup logic from the
> > > CheckpointsCleaner and StateHandleStore into what's currently called
> > > CompletedCheckpointStore.
> > >
> > > Looking forward to feedback on that proposal.
> > >
> > > Best,
> > > Matthias
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> > >
> > > On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl<ma...@aiven.io>
> > > wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I’d like to start a discussion on repeatable cleanup of checkpoint
> data.
> > >> In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> > >> introduction of the JobResultStore component. The goal was to make
> Flink
> > >> being in charge of cleanup for the data it owns. The Flink cluster
> > should
> > >> only shutdown gracefully after all its artifacts are removed. That
> way,
> > one
> > >> would not miss abandoned artifacts accidentally.
> > >>
> > >> We forgot to cover one code path around cleaning up checkpoint data.
> > >> Currently, in case of an error (e.g. permission issues), checkpoints
> are
> > >> tried to be cleaned up in the CheckpointsCleaner and left like that if
> > >> that cleanup failed. A log message is printed. The user would be
> > >> responsible for cleaning up the data. This was discussed as part of
> the
> > >> release testing efforts for Flink 1.15 in FLINK-26388 [2].
> > >>
> > >> We could add repeatable cleanup in the CheckpointsCleaner. We would
> have
> > >> to make sure that all StateObject#discardState implementations are
> > >> idempotent. This is not necessarily the case right now (see
> FLINK-26606
> > >> [3]).
> > >>
> > >> Additionally, there is the problem of losing information about what
> > >> Checkpoints are subject to cleanup in case of JobManager failovers.
> > These
> > >> Checkpoints are not stored as part of the HA data. Additionally,
> > >> PendingCheckpoints are not serialized in any way, either. None of
> these
> > >> artifacts are picked up again after a failover. I see the following
> > options
> > >> here:
> > >>
> > >>     -
> > >>
> > >>     The purpose of CompletedCheckpointStore needs to be extended to
> > become
> > >>     a “general” CheckpointStore. It will store PendingCheckpoints and
> > >>     CompletedCheckpoints that are marked for deletion. After a
> failover,
> > >>     CheckpointsCleaner can pick up these instances again and continue
> > with
> > >>     the deletion process.
> > >>
> > >> The flaw of that approach is that we’re increasing the amount of data
> > that
> > >> is stored in the underlying StateHandleStore. Additionally, we’re
> going
> > >> to have an increased number of accesses to the
> CompletedCheckpointStore.
> > >> These accesses need to happen in the main thread; more specifically,
> > adding
> > >> PendingCheckpoints and marking Checkpoints for deletion.
> > >>
> > >>     -
> > >>
> > >>     We’re actually interested in cleaning up artifacts from the
> > >>     FileSystem, i.e. the artifacts created by the StateHandleStore
> used
> > >>     within the DefaultCompletedCheckpointStore containing the
> serialized
> > >>     CompletedCheckpoint instance and the checkpoint’s folder
> containing
> > >>     the actual operator states. We could adapt the
> > CompletedCheckpointStore
> > >>     in a way that any Checkpoint (including PendingCheckpoint) is
> > >>     serialized and persisted on the FileSystem right away (which is
> > currently
> > >>     done within the StateHandleStore implementations when adding
> > >>     CompletedCheckpoints to the underlying HA system). The
> corresponding
> > >>     FileStateHandleObject (referring to that serialized
> > CompletedCheckpoint)
> > >>     that gets persisted to ZooKeeper/k8s ConfigMap in the end would be
> > only
> > >>     written if the CompletedCheckpoint is finalized and can be used.
> The
> > >>     CheckpointsCleaner could recover any artifacts from the FileSystem
> > and
> > >>     cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
> > >>
> > >> This approach saves us from accessing the HA backend (i.e.
> > ZooKeeper/k8s)
> > >> but would require additional IO on the FileSystem, still. It would
> > require
> > >> some larger refactoring, though: The RetrievableStateHandle currently
> > being
> > >> handled internally (i.e. in the StateHandleStore) need to become
> public.
> > >>
> > >>     -
> > >>
> > >>     We just add repeatable cleanup to the CheckpointsCleaner as is. No
> > >>     cleanup is picked up again after recovery. This could be a
> fallback
> > for the
> > >>     user to reduce IO.
> > >>
> > >>
> > >> I’m interested in initial opinions from the community on that matter
> > >> before starting to work on a final FLIP.
> > >>
> > >> Best,
> > >>
> > >> Matthias
> > >>
> > >>
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> > >>
> > >> [2]https://issues.apache.org/jira/browse/FLINK-26388
> > >>
> > >> [3]https://issues.apache.org/jira/browse/FLINK-26606
> > >>
> >
>

Re: [DISCUSS] Repeatable cleanup of checkpoint data

Posted by Matthias Pohl <ma...@aiven.io.INVALID>.
Thanks for sharing your opinions on the proposal. The concerns sound
reasonable. I guess, I'm going to follow-up on Chesnay's idea about
combining multiple requests into one for the k8s implementation. Having a
performance test for the k8s API server access sounds like a good idea,
too. Both action items are a prerequisite before continuing with FLIP-270
[1].

@Yang Wang: Do we have some Jira issue or ML discussion on the k8s API
server performance issues? I couldn't come up with a good search query
myself. :-D

@Robert (CC'd): was it you who worked on the k8s API server overload issue
in 1.15? Do you have some memory about it or some starting point with
source code or something similar?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints

On Mon, Nov 7, 2022 at 12:59 PM Chesnay Schepler <ch...@apache.org> wrote:

> This is a nice FLIP. I particular like how much background it provides
> on the issue; something that other FLIPs could certainly benefit from...
>
> I went over the FLIP and had a chat with Matthias about it.
>
> Somewhat unrelated to the FLIP we found a flaw in the current cleanup
> mechanism of failed checkpoints, where the JM deletes files while a TM
> may still be in the process of writing checkpoint data. This is because
> we never wait for an ack from the TMs that that have aborted the
> checkpoint.
> We additionally noted that when incremental checkpoints are enabled we
> might be storing a large number of checkpoints in HA, without a
> conclusion on what to do about it.
>
>
> As for the FLIP itself, I'm concerned about proposal #2 because it
> requires iterating over the entire checkpoint directory on /any/
> failover to find checkpoints that can be deleted. This can be an
> expensive operation for certain filesystems (S3), particularly when
> incremental checkpoints are being used.
> In the interest of fast failovers we ideally don't use mechanisms that
> scale with.../anything/, really.
>
> However, storing more data in HA is also concerning, as Yang Wang
> pointed out.
> To not increase the number of requests made against HA we could maybe
> consider looking into piggy-backing delete operations on other HA
> operations, like the checkpoint counter increments.
>
> On that note, do we have any benchmarks for HA? I remember we looked
> into that for...1.15 I believe at some point. With HA load being such a
> major concern for this FLIP it would be good to have _something_ to
> measure that.
>
> On 27/10/2022 14:20, Matthias Pohl wrote:
> > I would like to bring this topic up one more time. I put some more
> thought
> > into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
> > updated version of what I summarized in my previous email. It would be
> > interesting to get some additional perspectives on this; more
> specifically,
> > the two included proposals about either just repurposing the
> > CompletedCheckpointStore into a more generic CheckpointStore or
> refactoring
> > the StateHandleStore interface moving all the cleanup logic from the
> > CheckpointsCleaner and StateHandleStore into what's currently called
> > CompletedCheckpointStore.
> >
> > Looking forward to feedback on that proposal.
> >
> > Best,
> > Matthias
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> >
> > On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl<ma...@aiven.io>
> > wrote:
> >
> >> Hi everyone,
> >>
> >> I’d like to start a discussion on repeatable cleanup of checkpoint data.
> >> In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> >> introduction of the JobResultStore component. The goal was to make Flink
> >> being in charge of cleanup for the data it owns. The Flink cluster
> should
> >> only shutdown gracefully after all its artifacts are removed. That way,
> one
> >> would not miss abandoned artifacts accidentally.
> >>
> >> We forgot to cover one code path around cleaning up checkpoint data.
> >> Currently, in case of an error (e.g. permission issues), checkpoints are
> >> tried to be cleaned up in the CheckpointsCleaner and left like that if
> >> that cleanup failed. A log message is printed. The user would be
> >> responsible for cleaning up the data. This was discussed as part of the
> >> release testing efforts for Flink 1.15 in FLINK-26388 [2].
> >>
> >> We could add repeatable cleanup in the CheckpointsCleaner. We would have
> >> to make sure that all StateObject#discardState implementations are
> >> idempotent. This is not necessarily the case right now (see FLINK-26606
> >> [3]).
> >>
> >> Additionally, there is the problem of losing information about what
> >> Checkpoints are subject to cleanup in case of JobManager failovers.
> These
> >> Checkpoints are not stored as part of the HA data. Additionally,
> >> PendingCheckpoints are not serialized in any way, either. None of these
> >> artifacts are picked up again after a failover. I see the following
> options
> >> here:
> >>
> >>     -
> >>
> >>     The purpose of CompletedCheckpointStore needs to be extended to
> become
> >>     a “general” CheckpointStore. It will store PendingCheckpoints and
> >>     CompletedCheckpoints that are marked for deletion. After a failover,
> >>     CheckpointsCleaner can pick up these instances again and continue
> with
> >>     the deletion process.
> >>
> >> The flaw of that approach is that we’re increasing the amount of data
> that
> >> is stored in the underlying StateHandleStore. Additionally, we’re going
> >> to have an increased number of accesses to the CompletedCheckpointStore.
> >> These accesses need to happen in the main thread; more specifically,
> adding
> >> PendingCheckpoints and marking Checkpoints for deletion.
> >>
> >>     -
> >>
> >>     We’re actually interested in cleaning up artifacts from the
> >>     FileSystem, i.e. the artifacts created by the StateHandleStore used
> >>     within the DefaultCompletedCheckpointStore containing the serialized
> >>     CompletedCheckpoint instance and the checkpoint’s folder containing
> >>     the actual operator states. We could adapt the
> CompletedCheckpointStore
> >>     in a way that any Checkpoint (including PendingCheckpoint) is
> >>     serialized and persisted on the FileSystem right away (which is
> currently
> >>     done within the StateHandleStore implementations when adding
> >>     CompletedCheckpoints to the underlying HA system). The corresponding
> >>     FileStateHandleObject (referring to that serialized
> CompletedCheckpoint)
> >>     that gets persisted to ZooKeeper/k8s ConfigMap in the end would be
> only
> >>     written if the CompletedCheckpoint is finalized and can be used. The
> >>     CheckpointsCleaner could recover any artifacts from the FileSystem
> and
> >>     cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
> >>
> >> This approach saves us from accessing the HA backend (i.e.
> ZooKeeper/k8s)
> >> but would require additional IO on the FileSystem, still. It would
> require
> >> some larger refactoring, though: The RetrievableStateHandle currently
> being
> >> handled internally (i.e. in the StateHandleStore) need to become public.
> >>
> >>     -
> >>
> >>     We just add repeatable cleanup to the CheckpointsCleaner as is. No
> >>     cleanup is picked up again after recovery. This could be a fallback
> for the
> >>     user to reduce IO.
> >>
> >>
> >> I’m interested in initial opinions from the community on that matter
> >> before starting to work on a final FLIP.
> >>
> >> Best,
> >>
> >> Matthias
> >>
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> >>
> >> [2]https://issues.apache.org/jira/browse/FLINK-26388
> >>
> >> [3]https://issues.apache.org/jira/browse/FLINK-26606
> >>
>

Re: [DISCUSS] Repeatable cleanup of checkpoint data

Posted by Chesnay Schepler <ch...@apache.org>.
This is a nice FLIP. I particular like how much background it provides 
on the issue; something that other FLIPs could certainly benefit from...

I went over the FLIP and had a chat with Matthias about it.

Somewhat unrelated to the FLIP we found a flaw in the current cleanup 
mechanism of failed checkpoints, where the JM deletes files while a TM 
may still be in the process of writing checkpoint data. This is because 
we never wait for an ack from the TMs that that have aborted the checkpoint.
We additionally noted that when incremental checkpoints are enabled we 
might be storing a large number of checkpoints in HA, without a 
conclusion on what to do about it.


As for the FLIP itself, I'm concerned about proposal #2 because it 
requires iterating over the entire checkpoint directory on /any/ 
failover to find checkpoints that can be deleted. This can be an 
expensive operation for certain filesystems (S3), particularly when 
incremental checkpoints are being used.
In the interest of fast failovers we ideally don't use mechanisms that 
scale with.../anything/, really.

However, storing more data in HA is also concerning, as Yang Wang 
pointed out.
To not increase the number of requests made against HA we could maybe 
consider looking into piggy-backing delete operations on other HA 
operations, like the checkpoint counter increments.

On that note, do we have any benchmarks for HA? I remember we looked 
into that for...1.15 I believe at some point. With HA load being such a 
major concern for this FLIP it would be good to have _something_ to 
measure that.

On 27/10/2022 14:20, Matthias Pohl wrote:
> I would like to bring this topic up one more time. I put some more thought
> into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
> updated version of what I summarized in my previous email. It would be
> interesting to get some additional perspectives on this; more specifically,
> the two included proposals about either just repurposing the
> CompletedCheckpointStore into a more generic CheckpointStore or refactoring
> the StateHandleStore interface moving all the cleanup logic from the
> CheckpointsCleaner and StateHandleStore into what's currently called
> CompletedCheckpointStore.
>
> Looking forward to feedback on that proposal.
>
> Best,
> Matthias
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl<ma...@aiven.io>
> wrote:
>
>> Hi everyone,
>>
>> I’d like to start a discussion on repeatable cleanup of checkpoint data.
>> In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
>> introduction of the JobResultStore component. The goal was to make Flink
>> being in charge of cleanup for the data it owns. The Flink cluster should
>> only shutdown gracefully after all its artifacts are removed. That way, one
>> would not miss abandoned artifacts accidentally.
>>
>> We forgot to cover one code path around cleaning up checkpoint data.
>> Currently, in case of an error (e.g. permission issues), checkpoints are
>> tried to be cleaned up in the CheckpointsCleaner and left like that if
>> that cleanup failed. A log message is printed. The user would be
>> responsible for cleaning up the data. This was discussed as part of the
>> release testing efforts for Flink 1.15 in FLINK-26388 [2].
>>
>> We could add repeatable cleanup in the CheckpointsCleaner. We would have
>> to make sure that all StateObject#discardState implementations are
>> idempotent. This is not necessarily the case right now (see FLINK-26606
>> [3]).
>>
>> Additionally, there is the problem of losing information about what
>> Checkpoints are subject to cleanup in case of JobManager failovers. These
>> Checkpoints are not stored as part of the HA data. Additionally,
>> PendingCheckpoints are not serialized in any way, either. None of these
>> artifacts are picked up again after a failover. I see the following options
>> here:
>>
>>     -
>>
>>     The purpose of CompletedCheckpointStore needs to be extended to become
>>     a “general” CheckpointStore. It will store PendingCheckpoints and
>>     CompletedCheckpoints that are marked for deletion. After a failover,
>>     CheckpointsCleaner can pick up these instances again and continue with
>>     the deletion process.
>>
>> The flaw of that approach is that we’re increasing the amount of data that
>> is stored in the underlying StateHandleStore. Additionally, we’re going
>> to have an increased number of accesses to the CompletedCheckpointStore.
>> These accesses need to happen in the main thread; more specifically, adding
>> PendingCheckpoints and marking Checkpoints for deletion.
>>
>>     -
>>
>>     We’re actually interested in cleaning up artifacts from the
>>     FileSystem, i.e. the artifacts created by the StateHandleStore used
>>     within the DefaultCompletedCheckpointStore containing the serialized
>>     CompletedCheckpoint instance and the checkpoint’s folder containing
>>     the actual operator states. We could adapt the CompletedCheckpointStore
>>     in a way that any Checkpoint (including PendingCheckpoint) is
>>     serialized and persisted on the FileSystem right away (which is currently
>>     done within the StateHandleStore implementations when adding
>>     CompletedCheckpoints to the underlying HA system). The corresponding
>>     FileStateHandleObject (referring to that serialized CompletedCheckpoint)
>>     that gets persisted to ZooKeeper/k8s ConfigMap in the end would be only
>>     written if the CompletedCheckpoint is finalized and can be used. The
>>     CheckpointsCleaner could recover any artifacts from the FileSystem and
>>     cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
>>
>> This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
>> but would require additional IO on the FileSystem, still. It would require
>> some larger refactoring, though: The RetrievableStateHandle currently being
>> handled internally (i.e. in the StateHandleStore) need to become public.
>>
>>     -
>>
>>     We just add repeatable cleanup to the CheckpointsCleaner as is. No
>>     cleanup is picked up again after recovery. This could be a fallback for the
>>     user to reduce IO.
>>
>>
>> I’m interested in initial opinions from the community on that matter
>> before starting to work on a final FLIP.
>>
>> Best,
>>
>> Matthias
>>
>>
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>>
>> [2]https://issues.apache.org/jira/browse/FLINK-26388
>>
>> [3]https://issues.apache.org/jira/browse/FLINK-26606
>>

Re: [DISCUSS] Repeatable cleanup of checkpoint data

Posted by Yang Wang <da...@gmail.com>.
Thanks Matthias for continuously improving the clean-up process.

Given that we highly depends on K8s APIServer for HA implementation, I am
not in favor of storing too many entries in the ConfigMap,
as well as adding more update requests to the APIServer. So I lean towards
Proposal #2. It just works like we revert the current mark-deletion
in StateHandleStore and then introduce a completely new FileSystem based
artifacts clean-up mechanism.

When doing the failover, I suggest the clean-up to be processed
asynchronously. Otherwise, listing the completed checkpoints and deleting
the invalid ones will take too much time and slow down the recovery process.

Best,
Yang

Matthias Pohl <ma...@aiven.io.invalid> 于2022年10月27日周四 20:20写道:

> I would like to bring this topic up one more time. I put some more thought
> into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
> updated version of what I summarized in my previous email. It would be
> interesting to get some additional perspectives on this; more specifically,
> the two included proposals about either just repurposing the
> CompletedCheckpointStore into a more generic CheckpointStore or refactoring
> the StateHandleStore interface moving all the cleanup logic from the
> CheckpointsCleaner and StateHandleStore into what's currently called
> CompletedCheckpointStore.
>
> Looking forward to feedback on that proposal.
>
> Best,
> Matthias
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl <ma...@aiven.io>
> wrote:
>
> > Hi everyone,
> >
> > I’d like to start a discussion on repeatable cleanup of checkpoint data.
> > In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> > introduction of the JobResultStore component. The goal was to make Flink
> > being in charge of cleanup for the data it owns. The Flink cluster should
> > only shutdown gracefully after all its artifacts are removed. That way,
> one
> > would not miss abandoned artifacts accidentally.
> >
> > We forgot to cover one code path around cleaning up checkpoint data.
> > Currently, in case of an error (e.g. permission issues), checkpoints are
> > tried to be cleaned up in the CheckpointsCleaner and left like that if
> > that cleanup failed. A log message is printed. The user would be
> > responsible for cleaning up the data. This was discussed as part of the
> > release testing efforts for Flink 1.15 in FLINK-26388 [2].
> >
> > We could add repeatable cleanup in the CheckpointsCleaner. We would have
> > to make sure that all StateObject#discardState implementations are
> > idempotent. This is not necessarily the case right now (see FLINK-26606
> > [3]).
> >
> > Additionally, there is the problem of losing information about what
> > Checkpoints are subject to cleanup in case of JobManager failovers. These
> > Checkpoints are not stored as part of the HA data. Additionally,
> > PendingCheckpoints are not serialized in any way, either. None of these
> > artifacts are picked up again after a failover. I see the following
> options
> > here:
> >
> >    -
> >
> >    The purpose of CompletedCheckpointStore needs to be extended to become
> >    a “general” CheckpointStore. It will store PendingCheckpoints and
> >    CompletedCheckpoints that are marked for deletion. After a failover,
> >    CheckpointsCleaner can pick up these instances again and continue with
> >    the deletion process.
> >
> > The flaw of that approach is that we’re increasing the amount of data
> that
> > is stored in the underlying StateHandleStore. Additionally, we’re going
> > to have an increased number of accesses to the CompletedCheckpointStore.
> > These accesses need to happen in the main thread; more specifically,
> adding
> > PendingCheckpoints and marking Checkpoints for deletion.
> >
> >    -
> >
> >    We’re actually interested in cleaning up artifacts from the
> >    FileSystem, i.e. the artifacts created by the StateHandleStore used
> >    within the DefaultCompletedCheckpointStore containing the serialized
> >    CompletedCheckpoint instance and the checkpoint’s folder containing
> >    the actual operator states. We could adapt the
> CompletedCheckpointStore
> >    in a way that any Checkpoint (including PendingCheckpoint) is
> >    serialized and persisted on the FileSystem right away (which is
> currently
> >    done within the StateHandleStore implementations when adding
> >    CompletedCheckpoints to the underlying HA system). The corresponding
> >    FileStateHandleObject (referring to that serialized
> CompletedCheckpoint)
> >    that gets persisted to ZooKeeper/k8s ConfigMap in the end would be
> only
> >    written if the CompletedCheckpoint is finalized and can be used. The
> >    CheckpointsCleaner could recover any artifacts from the FileSystem and
> >    cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
> >
> > This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
> > but would require additional IO on the FileSystem, still. It would
> require
> > some larger refactoring, though: The RetrievableStateHandle currently
> being
> > handled internally (i.e. in the StateHandleStore) need to become public.
> >
> >    -
> >
> >    We just add repeatable cleanup to the CheckpointsCleaner as is. No
> >    cleanup is picked up again after recovery. This could be a fallback
> for the
> >    user to reduce IO.
> >
> >
> > I’m interested in initial opinions from the community on that matter
> > before starting to work on a final FLIP.
> >
> > Best,
> >
> > Matthias
> >
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> >
> > [2] https://issues.apache.org/jira/browse/FLINK-26388
> >
> > [3] https://issues.apache.org/jira/browse/FLINK-26606
> >
>