You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Hangxiang Yu <ma...@gmail.com> on 2022/03/29 09:17:00 UTC

Re: [DISCUSS] FLIP-193: Snapshots ownership

Hi, I found old checkpoints will not be deleted in claim mode in two cases:
1. set state.checkpoints.num-retained to num bigger than 1
2. restore from legacy mode which produces some retained checkpoints
In above cases, we only claim the last checkpoint in the claim mode so that
only the last checkpoint will be deleted and others will not.
Should we need to claim more previous checkpoints in claim mode?

Best,
Hangxiang

On Fri, Nov 26, 2021 at 11:11 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> If they'd like to use the --no-claim mode that would be the way to go, yes.
>
> Two points to be on the same page here:
>
>    - all Flink native state backends (RocksDB, HashMap, changelog) would
>    already support --no-claim
>    - if in the end we add the --legacy mode, users can also use that mode
>    instead of --claim.
>
> Best,
>
> Dawid
> On 26/11/2021 15:57, Till Rohrmann wrote:
>
> Thanks for writing this FLIP Dawid. Just to clarify one thing for the
> support of forced full snapshots. If a state backend does not support this
> feature, then the user either has to copy the snapshot manually or resume
> using --claim mode, create a savepoint in canonical format and then
> change the state backend if he wants to use --no-claim, right?
>
> Cheers,
> Till
>
> On Fri, Nov 26, 2021 at 11:49 AM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org>
> wrote:
>
>
> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>
> I don't think this is a good idea. The modes apply to both savepoints and
> checkpoints, plus it's much longer to type in. (minor)
>
> - add an explicit option to preserve the current behavior (no claim
> and no duplicate)?
>
> We had an offline discussion about it and so far we were leaning towards
> keeping the set of supported options minimal. However, if we really think
> the old behaviour is useful we can add a --legacy restore mode. cc
> @Konstantin @Piotr
>
> There seems to be a consensus in the discussion, however, I couldn't
> find stop-with-savepoint in the document.
>
> Sorry, I forgot, about this one. I added a note that savepoints generated
> from stop-with-savepoint should commit side effects.
>
> And I still think it would be nice to list object stores which support
> duplicate operation.
>
> I listed a couple of file systems that do have some sort of a COPY API.
>
> Best,
>
> Dawid
> On 26/11/2021 11:03, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for updating the FLIP Dawid
>
> There seems to be a consensus in the discussion, however, I couldn't
> find stop-with-savepoint in the document.
>
> A few minor things:
> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
> - add an explicit option to preserve the current behavior (no claim
> and no duplicate)?
> And I still think it would be nice to list object stores which support
> duplicate operation.
>
> Regards,
> Roman
>
>
> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> wrote:
>
> Hi Dawid,
>
> sounds good, specifically 2., too.
>
> Best,
>
> Konstantin
>
> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org>
> wrote:
>
>
> Hi all,
>
> I updated the FLIP with a few clarifications:
>
>    1. I added a description how would we trigger a "full snapshot" in the
>    changelog state backend
>       - (We would go for this option in the 1st version). Trigger a
>       snapshot of the base state backend in the 1st checkpoint, which induces
>       materializing the changelog. In this approach we could duplicate SST files,
>       but we would not duplicate the diff files.
>    - Add a hook for logic for computing which task should duplicate the
>       diff files. We would have to do a pass over all states after the state
>       assignment in StateAssignmentOperation
>       2. I clarified that the "no-claim" mode requires a
>    completed/successful checkpoint before we can remove the one we are
>    restoring from. Also added a note that we can assume a checkpoint is
>    completed if it is confirmed by Flink's API for checkpointing stats or by
>    checking an entry in HA services. A checkpoint can not be assumed completed
>    by just looking at the checkpoint files.
>
> I suggest going on with the proposal for "no-claim" as suggested so far,
> as it is easier to understand by users. They can reliably tell when they
> can expect the checkpoint to be deletable. If we see that the time to take
> the 1st checkpoint becomes a problem we can extend the set of restore
> methods and e.g. add a "claim-temporarily" method.
>
> I hope we can reach a consensus and start a vote, some time early next
> week.
>
> Best,
>
> Dawid
>
> On 23/11/2021 22:39, Roman Khachatryan wrote:
>
> I also referred to the "no-claim" mode and I still think neither of them works in that mode, as you'd have to keep lineage of checkpoints externally to be able delete any checkpoint.
>
> I think the lineage is needed in all approaches with arbitrary
> histories; the difference is whether a running Flink is required or
> not. Is that what you mean?
> (If not, could you please explain how the scenario you mentioned above
> with multiple jobs branching from the same checkpoint is handled?)
>
>
> BTW, the state key for RocksDB is actually: backend UID + key group range + SST file name, so the key would be different (the key group range is different for two tasks) and we would've two separate counters for the same file.
>
> You're right. But there is also a collision between old and new entries.
>
>
> To be on the same page here. It is not a problem so far in RocksDB, because we do not reuse any shared files in case of rescaling.
>
> As I mentioned above, collision happens not only because of rescaling;
> and AFAIK, there are some ideas to reuse files on rescaling (probably
> Yuan could clarify). Anyways, I think it makes sense to not bake in
> this assumption unless it's hard to implement (or at least state it
> explicitly in FLIP).
>
>
> It is not suggested as an optimization. It is suggested as a must for state backends that need it. I did not elaborate on it, because it could affected only the changelog state backend at the moment, which I don't have much insights. I agree it might make sense to look a bit how we could force full snapshots in the changelog state backend. I will spend some extra time on that.
>
> I see. For the Changelog state backend, the easiest way would be to
> obtain a full snapshot from the underlying backend in snapshot(),
> ignoring all non-materialized changes. This will effectively
> materialize all the changes, so only new non-materialized state will
> be used in subsequent checkpoints.
>
>
> Only the task that gets assigned [1,16] would be responsible for duplicating files of the old range [1, 64].
>
> Wouldn't it be likely that the same TM will be responsible for [1, 64]
> "windowState", [1, 64] "timerState", and so on, for all operators in
> the chain, and probably other chains? (that what I mean by skew)
> If we want to address this, preserving handle immutability then we'll
> probably have to rebuild the whole task state snapshot.
> (depending on how we approach RocksDB re-uploading, it might not be
> relevant though)
>
>
> Regards,
> Roman
>
>
> On Tue, Nov 23, 2021 at 4:06 PM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> wrote:
>
> I think I know where the confusion comes from regarding arbitrarily
> recovery histories: Both my counter-proposals were for "no-claim"
> mode; I didn't mean to replace "claim" mode with them.
> However, as Yun pointed out, it's impossible to guarantee that all the
> files will be compacted in a finite number of checkpoints; so let's
> withdraw those proposals.
>
> I also referred to the "no-claim" mode and I still think neither of them works in that mode, as you'd have to keep lineage of checkpoints externally to be able delete any checkpoint.
>
> Let's consider a job running with DoP=1; it created checkpoint C1 with
> a single file F1 and then stopped.
> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
> will receive the same file F1.
>
> To be on the same page here. It is not a problem so far in RocksDB, because we do not reuse any shared files in case of rescaling. If we want to change how rescaling in RocksDB works then yes, we would have to consider how we want to make sure we copy/duplicate just once. However we would have to first change a crucial thing about regular incremental checkpoints and reorganize the SharedStateRegistry along the way.
>
> BTW, the state key for RocksDB is actually: backend UID + key group range + SST file name, so the key would be different (the key group range is different for two tasks) and we would've two separate counters for the same file.
>
> Of course, correct me if I am wrong in the two paragraphs above.
>
> Re-upload from one task (proposed in FLIP as optimization)
>
> It is not suggested as an optimization. It is suggested as a must for state backends that need it. I did not elaborate on it, because it could affected only the changelog state backend at the moment, which I don't have much insights. I agree it might make sense to look a bit how we could force full snapshots in the changelog state backend. I will spend some extra time on that.
>
> Lastly I might be wrong, but I think the KeyedStateHandle#getIntersection is a good candidate to distribute the task of duplicating shared files pretty evenly. The idea was that we could mark specially the handles that are assigned the "start of the old key group range". Therefore if a file belonged to a handle responsible for a key group range: [1,64], which is later on split into [1, 16], [17, 32], [33, 48]. [49, 64]. Only the task that gets assigned [1,16] would be responsible for duplicating files of the old range [1, 64].
>
> Best,
>
> Dawid
>
> On 23/11/2021 14:27, Khachatryan Roman wrote:
>
> Thanks Dawid, Yun and Piotr,
>
> I think I know where the confusion comes from regarding arbitrarily
> recovery histories: Both my counter-proposals were for "no-claim"
> mode; I didn't mean to replace "claim" mode with them.
> However, as Yun pointed out, it's impossible to guarantee that all the
> files will be compacted in a finite number of checkpoints; so let's
> withdraw those proposals.
>
> And as there are no other alternatives left, the changes to
> SharedStateRegistry or State Backends are not a decisive factor
> anymore.
>
> However, it probably still makes sense to clarify the details of how
> re-upload will work in case of rescaling.
>
> Let's consider a job running with DoP=1; it created checkpoint C1 with
> a single file F1 and then stopped.
> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
> will receive the same file F1.
>
> Let's say both tasks will re-use F1, so it needs to be re-uploaded.
> Now, we have a choice:
> 1. Re-upload from both tasks
> For RocksDB, the state key is: backend UID + SST file name. Both are
> the same for two tasks, so the key will be the same.
> Currently, SharedStateRegistry will reject both as duplicates.
>
> We can't just replace (to not lose one of the files), so we have to
> use random keys.
> However, when we further downscale:
> - we'll have a conflict on recovery (multiple SST files with the same name)
> - we'll re-upload the same file multiple times unnecessarily
> So we have to de-duplicate state on recovery - ideally before sending
> state snapshots to tasks.
>
> 2. Re-upload from one task (proposed in FLIP as optimization)
> Both tasks must learn the new key. Otherwise, the snapshot of the
> not-reuploading task will refer to a non-existing entry.
> We can either re-use the old key (and allow replacement in
> SharedStateRegistry); or generate the key on JM before sending task
> state snapshots.
>
>
> P.S.:
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> This is effectively what we have right now, but with an extra (Async?)
>
> Right now, there is absolutely no way to find out when the shared
> state can be deleted; it can't be inferred from which checkpoints are
> subsumed, and which are not, as future checkpoints might still be
> using that state.
>
> Regards,
> Roman
>
>
>
> On Tue, Nov 23, 2021 at 1:37 PM Piotr Nowojski <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> wrote:
>
> Hi,
>
> I'm not entirely sure if I fully understand the raised concerns here. So
> let me maybe step back in the discussion a bit and address the original
> points from Roman.
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim
>
> mode?
>
> I second the concerns from Dawid. This is effectively what we have right
> now, but with an extra (Async?) API call. It's not conceptually simple,
> it's hard to explain to the users, it might take actually forever to
> release the artefacts. Furthermore I don't think the implementation would
> be trivial.
>
> On the other hand the current proposal of having (a) `--claim` and (b)
> `--no-claim` mode are conceptually very simple. (a) being perfectly
> efficient, without any overheads. If you have concerns that (b) will cause
> some overheads, slower first checkpoint etc, keep in mind that the user can
> always pick option (a). Starting a new job from an existing
> savepoint/externalised checkpoint in general shouldn't be time critical, so
> users can always even manually copy the files and still use option (a), or
> just be fine accepting the price of a slower first checkpoint. For other
> use cases - restarting the same job after a downtime - (b) sounds to me to
> be an acceptable option.
>
> I would also like to point out that the "force full snapshot"/"do not use
> previous artefacts" option we will need either way for the incremental
> intermediate savepoints (subject of a next FLIP). From this perspective, we
> are getting the "--no-claim" option basically for free.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but after
>
> a configured number of checkpoints?
>
> I don't see a reason why we couldn't provide an option like that at some
> point in the future. However as it's more complicated to reason about, more
> complicated to implement and I'm not entirely sure how much actually needed
> given the (a) `--claim` mode, I think we can wait for feedback from the
> users before actually implementing it.
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> What do you mean by "rebuilding Task snapshot"?
>
> During some early discussions about this point, I've hoped that a state
> backend like changelog could embed into the state handle information which
> operator should actually be responsible for duplicating such shared states.
> However now that I'm thinking about it, indeed there might be an issue if
> we combine the fact that state handles can be shared across multiple
> different operators and with a job modification, like dropping an operator.
> In that case it looks like we would need some extra logic during recovery,
> that would have an overview of the whole job to make a decision which
> particular parallel instance of an operator should be responsible for
> duplicating the underlying file?
>
> Best,
> Piotrek
>
> wt., 23 lis 2021 o 12:28 Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org>
> napisał(a):
>
> Some user has usage to restore different jobs based on a same externalized
> checkpoint. I think this usage would break after introducing this FLIP, and
> we must tell users explicitly if choose to make Flink manage the
> checkpoints by default.
>
> Could you elaborate what do you mean? The proposal is to use the
> "no-claim" mode by default which should let users start as many jobs as
> they wish from the same externalized checkpoints and it should not cause
> them any harm. Each job effectively will create effectively it's own
> private "copy" of the initial checkpoint along with the 1st taken
> checkpoint.
>
> If the 1st full checkpoint did not complete in the end, the next
> checkpoints have to try to reupload all artifacts again. I think this
> problem could be mitigated if task knows some files have been uploaded
> before.
>
> I don't know how we could achieve that easily. Besides, we have the same
> situation for all, even regular checkpoints don't we? Do we check if e.g.
> diff files has been successfully uploaded in a previous aborted checkpoint?
> I am not saying it's a wrong suggestion, just that I feel it is orthogonal
> and I can't see a straightforward way to implement it.
>
> Best,
>
> Dawid
> On 23/11/2021 07:52, Yun Tang wrote:
>
> Hi,
>
> For the likelihood of never deleting some SST files by RocksDB. Unfortunately, it could happen as current level compaction strategy in RocksDB is triggered by upper input level size reached to the threshold and the compaction priority cannot guarantee all files would be choosed during several round compactions.
>
> Actually, I am a bit in favor of this FLIP to manage checkpoints within Flink as we have heared from many users that they cannot delete older checkpoints after several rounds of re-launching Flink jobs. Current Flink would not delete older checkpoints automatically when restoring from older retained checkpoint, which makes the base checkpoint directory becomes larger and larger. However, if they decide to delete the older checkpoint directory of other job-ids, they might not be able to recover from the last completed checkpoint as it might depend on some artifacts in older checkpoint directory.
>
> And I think reuploading would indeed increase the 1st checkpoint duration after restoring. For aliyun oss, the developer said that copping files (larger than 32MB) from one location to another within same bucket on DFS could cause hundreds millseconds. However, from my experiences, copying on HDFS might not be so quick. Maybe some numbers here could be better.
>
> I have two questions here:
> 1. If the 1st full checkpoint did not complete in the end, the next checkpoints have to try to reupload all artifacts again. I think this problem could be mitigated if task knows some files have been uploaded before.
> 2. Some user has usage to restore different jobs based on a same externalized checkpoint. I think this usage would break after introducing this FLIP, and we must tell users explicitly if choose to make Flink manage the checkpoints by default.
>
> Best
> Yun Tang
>
>
> On 2021/11/22 19:49:11 Dawid Wysakowicz wrote:
>
>     There is one more fundamental issue with either of your two
>     proposals that've just came to my mind.
>     What happens if you have externalized checkpoints and the job fails
>     before the initial checkpoint can be safely removed?
>
>     You start the job from the latest created checkpoint and wait for it
>     to be allowed for deletion. Then you can delete it, and all previous
>     checkpoints (or am I missing something?)
>
>
> Let me clarify it with an example. You start with chk-42, Flink takes
> e.g. three checkpoints chk-43, chk-44, chk-45 all still reference chk-42
> files. After that it fails. We have externalized checkpoints enabled,
> therefore we have retained all checkpoints. Users starts a new program
> from let's say chk-45. At this point your proposal does not give the
> user any help in regards when chk-42 can be safely removed. (This is
> also how Flink works right now).
>
> To make it even harder you can arbitrarily complicate it, 1) start a job
> from chk-44, 2) start a job from a chk-47 which depends on chk-45, 3)
> never start a job from chk-44, it is not claimed by any job, thus it is
> never deleted, users must remember themselves that chk-44 originated
> from chk-42 etc.) User would be forced to build a lineage system for
> checkpoints to track which checkpoints depend on each other.
>
>     I mean that the design described by FLIP implies the following (PCIIW):
>     1. treat SST files from the initial checkpoint specially: re-upload or
>     send placeholder - depending on those attributes in state handle
>     2. (SST files from newer checkpoints are re-uploaded depending on
>     confirmation currently; so yes there is tracking, but it's different)
>     3. SharedStateRegistry must allow replacing state under the existing
>     key; otherwise, if a new key is used then other parallel subtasks
>     should learn somehow this key and use it; However, allowing
>     replacement must be limited to this scenario, otherwise it can lead to
>     previous checkpoint corruption in normal cases
>
> I might not understand your points, but I don't think FLIP implies any
> of this. The FLIP suggests to send along with the CheckpointBarrier a
> flag "force full checkpoint". Then the state backend should respect it
> and should not use any of the previous shared handles. Now let me
> explain how that would work for RocksDB incremental checkpoints.
>
>  1. Simplest approach: upload all local RocksDB files. This works
>     exactly the same as the first incremental checkpoint for a fresh start.
>  2. Improvement on 1) we already do know which files were uploaded for
>     the initial checkpoint. Therefore instead of uploading the local
>     files that are same with files uploaded for the initial checkpoint
>     we call duplicate for those files and upload just the diff.
>
> It does not require any changes to the SharedStateRegistry nor to state
> handles, at least for RocksDB.
>
> Best,
>
> Dawid
>
>
> On 22/11/2021 19:33, Roman Khachatryan wrote:
>
> If you assume the 1st checkpoint needs to be "full" you know you are not allowed to use any shared files.
> It's true you should know about the shared files of the previous checkpoint, but e.g. RocksDB already tracks that.
>
> I mean that the design described by FLIP implies the following (PCIIW):
> 1. treat SST files from the initial checkpoint specially: re-upload or
> send placeholder - depending on those attributes in state handle
> 2. (SST files from newer checkpoints are re-uploaded depending on
> confirmation currently; so yes there is tracking, but it's different)
> 3. SharedStateRegistry must allow replacing state under the existing
> key; otherwise, if a new key is used then other parallel subtasks
> should learn somehow this key and use it; However, allowing
> replacement must be limited to this scenario, otherwise it can lead to
> previous checkpoint corruption in normal cases
>
> Forcing a full checkpoint after completing N checkpoints instead of
> immediately would only require enabling (1) after N checkpoints.
> And with the "poll API until checkpoint released" approach, those
> changes aren't necessary.
>
>
> There is one more fundamental issue with either of your two proposals that've just came to my mind.
> What happens if you have externalized checkpoints and the job fails before the initial checkpoint can be safely removed?
>
> You start the job from the latest created checkpoint and wait for it
> to be allowed for deletion. Then you can delete it, and all previous
> checkpoints (or am I missing something?)
>
>
> With tracking the shared files on JM you can not say if you can clear the files after couple of checkpoints or 10s, 100s or 1000s,
> which translates into minutes/hours/days/weeks of processing.
>
> This doesn't necessarily translate into higher cost (because of saved
> RPC etc., as I mentioned above).
> However, I do agree that an infinite or arbitrary high delay is unacceptable.
>
> The added complexity above doesn't seem negligible to me (especially
> in SharedStateHandle); and should therefore be weighted against those
> operational disadvantages (given that the number of checkpoints to
> wait is bounded in practice).
>
> Regards,
> Roman
>
>
>
>
> On Mon, Nov 22, 2021 at 5:05 PM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> wrote:
>
> There is one more fundamental issue with either of your two proposals
> that've just came to my mind. What happens if you have externalized
> checkpoints and the job fails before the initial checkpoint can be
> safely removed? You have a situation where you have a retained
> checkpoint that was built on top of the original one. Basically ending
> in a situation we have right now that you never know when it is safe to
> delete a retained checkpoint.
>
> BTW, the intention for the "claim" mode was to support cases when users
> are concerned with the performance of the first checkpoint. In those
> cases they can claim the checkpoint on don't pay the additional cost of
> the first checkpoint.
>
> Best,
>
> Dawid
>
> On 22/11/2021 14:09, Roman Khachatryan wrote:
>
> Thanks Dawid,
>
> Regarding clarity,
> I think that all proposals require waiting for some event: re-upload /
> checkpoint completion / api response.
> But with the current one, there is an assumption: "initial checkpoint
> can be deleted once a new one completes" (instead of just "initial
> checkpoint can be deleted once the API says it can be deleted").
> So I think it's actually more clear to offer this explicit API and rely on it.
>
> Regarding delaying the deletion,
> I agree that it can delay deletion, but how important is it?
> Checkpoints are usually stored on relatively cheap storage like S3, so
> some delay shouldn't be an issue (especially taking rounding into
> account); it can even be cheaper or comparable to paying for
> re-upload/duplicate calls.
>
> Infinite delay can be an issue though, I agree.
> Maybe @Yun can clarify the likelihood of never deleting some SST files
> by RocksDB?
> For the changelog backend, old files won't be used once
> materialization succeeds.
>
> Yes, my concern is checkpointing time, but also added complexity:
>
> It would be a bit invasive though, as we would have to somehow keep track which files should not be reused on TMs.
>
> I think we need this anyway if we choose to re-upload files once the
> job is running.
> The new checkpoint must be formed by re-uploaded old artifacts AND
> uploaded new artifacts.
>
>
> Regards,
> Roman
>
>
> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz<dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> wrote:
>
> @Yun
>
> I think it is a good comment with I agree in principal. However, we use --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings for both restoring from a savepoint and an externalized checkpoint already. I wanted to voice that concern. Nevertheless I am fine with changing it to execution.restore-mode, if there are no other comments on that matter, I will change it.
>
> @Roman:
>
> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that to the doc.
>
> Re.2 What I don't like about this counter proposal is that it still has no clearly defined point in time when it is safe to delete the original checkpoint. Users would have a hard time reasoning about it and debugging. Even worse, I think worst case it might never happen that all the original files are no longer in use (I am not too familiar with RocksDB compaction, but what happens if there are key ranges that are never accessed again?) I agree it is unlikely, but possible, isn't it? Definitely it can take a significant time and many checkpoints to do so.
>
> Re. 3 I believe where you are coming from is that you'd like to keep the checkpointing time minimal and reuploading files may increase it. The proposal so far builds on the assumption we could in most cases use a cheap duplicate API instead of re-upload. I could see this as a follow-up if it becomes a bottleneck. It would be a bit invasive though, as we would have to somehow keep track which files should not be reused on TMs.
>
> Re. 2 & 3 Neither of the counter proposals work well for taking incremental savepoints. We were thinking of building incremental savepoints on the same concept. I think delaying the completion of an independent savepoint to a closer undefined future is not a nice property of savepoints.
>
> Re 4. Good point. We should make sure the first completed checkpoint has the independent/full checkpoint property rather than just the first triggered.
>
> Re. 5 & 6 I need a bit more time to look into it.
>
> Best,
>
> Dawid
>
> On 22/11/2021 11:40, Roman Khachatryan wrote:
>
> Hi,
>
> Thanks for the proposal Dawid, I have some questions and remarks:
>
> 1. How will stop-with-savepoint be handled?
> Shouldn't side effects be enforced in this case? (i.e. send
> notifyCheckpointComplete)
>
> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
> Anyways, any external tool will have to poll Flink API waiting for the
> next (full) checkpoint, before deleting the retained checkpoint,
> right?
> Instead, we can provide an API which tells whether the 1st checkpoint
> is still in use (and not force re-upload it).
>
> Under the hood, it can work like this:
> - for the checkpoint Flink recovers from, remember all shared state
> handles it is adding
> - when unregistering shared state handles, remove them from the set above
> - when the set becomes empty the 1st checkpoint can be deleted externally
>
> Besides not requiring re-upload, it seems much simpler and less invasive.
> On the downside, state deletion can be delayed; but I think this is a
> reasonable trade-off.
>
> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
> after a configured number of checkpoints?
> There is a high chance that after some more checkpoints, initial state
> will not be used (because of compaction),
> so backends won't have to re-upload anything (or small part).
>
> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
> If not, I think the FLIP should consider this case.
>
> 5. Enforcing re-upload by a single task and Changelog state backend
> With Changelog state backend, a file can be shared by multiple operators.
> Therefore, getIntersection() is irrelevant here, because operators
> might not be sharing any key groups.
> (so we'll have to analyze "raw" file usage I think).
>
> 6. Enforcing re-upload by a single task and skew
> If we use some greedy logic like subtask 0 always re-uploads then it
> might be overloaded.
> So we'll have to obtain a full list of subtasks first (then probably
> choose randomly or round-robin).
> However, that requires rebuilding Task snapshot, which is doable but
> not trivial (which I think supports "reverse API option").
>
> 7. I think it would be helpful to list file systems / object stores
> that support "fast" copy (ideally with latency numbers).
>
> Regards,
> Roman
>
> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> wrote:
>
> Hi,
>
> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
> states. +1 for the overall changes since it makes the behavior clear and
> provide users a determined method to finally cleanup savepoints / retained checkpoints.
>
> Regarding the changes to the public interface, it seems currently the changes are all bound
> to the savepoint, but from the FLIP it seems perhaps we might also need to support the claim declaration
> for retained checkpoints like in the cli side[1] ? If so, then might it be better to change the option name
> from `execution.savepoint.restore-mode` to something like `execution.restore-mode`?
>
> Best,
> Yun
>
>
> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
> ------------------------------------------------------------------
> From:Konstantin Knauf <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org>
> Send Time:2021 Nov. 19 (Fri.) 16:00
> To:dev <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>
> Hi Dawid,
>
> Thanks for working on this FLIP. Clarifying the differences and
> guarantees around savepoints and checkpoints will make it easier and safer
> for users and downstream projects and platforms to work with them.
>
> +1 to the changing the current (undefined) behavior when recovering from
> retained checkpoints. Users can now choose between claiming and not
> claiming, which I think will make the current mixed behavior obsolete.
>
> Cheers,
>
> Konstantin
>
> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org>
> wrote:
>
> Hi devs,
>
> I'd like to bring up for a discussion a proposal to clean up ownership
> of snapshots, both checkpoints and savepoints.
>
> The goal here is to make it clear who is responsible for deleting
> checkpoints/savepoints files and when can that be done in a safe manner.
>
> Looking forward for your feedback!
>
> Best,
>
> Dawid
>
> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>
>
>
> --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>  --
>
> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>
>

Re: [DISCUSS] FLIP-193: Snapshots ownership

Posted by Dawid Wysakowicz <dw...@apache.org>.
Ad. 1 What do you mean here? It will be deleted once the new job created 
`state.checkpoints.num-retained` checkpoints and the original one is not 
needed anymore. This is expected. The way CLAIM works is basically that 
the restored checkpoint is subject to regular Flink's cleaning procedure.

Ad.2 We only ever restore a single checkpoint. We do not restore a whole 
checkpoints history. Can you elaborate on what is the exact situation in 
which you'd like to claim multiple checkpoints at a single time? We may 
consider that as a future improvement if it turns out to be valuable. It 
would be good to understand the use case first though.

Best,

Dawid

On 29/03/2022 11:17, Hangxiang Yu wrote:
> Hi, I found old checkpoints will not be deleted in claim mode in two cases:
> 1. set state.checkpoints.num-retained to num bigger than 1
> 2. restore from legacy mode which produces some retained checkpoints
> In above cases, we only claim the last checkpoint in the claim mode so that
> only the last checkpoint will be deleted and others will not.
> Should we need to claim more previous checkpoints in claim mode?
>
> Best,
> Hangxiang
>
> On Fri, Nov 26, 2021 at 11:11 PM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> If they'd like to use the --no-claim mode that would be the way to go, yes.
>>
>> Two points to be on the same page here:
>>
>>     - all Flink native state backends (RocksDB, HashMap, changelog) would
>>     already support --no-claim
>>     - if in the end we add the --legacy mode, users can also use that mode
>>     instead of --claim.
>>
>> Best,
>>
>> Dawid
>> On 26/11/2021 15:57, Till Rohrmann wrote:
>>
>> Thanks for writing this FLIP Dawid. Just to clarify one thing for the
>> support of forced full snapshots. If a state backend does not support this
>> feature, then the user either has to copy the snapshot manually or resume
>> using --claim mode, create a savepoint in canonical format and then
>> change the state backend if he wants to use --no-claim, right?
>>
>> Cheers,
>> Till
>>
>> On Fri, Nov 26, 2021 at 11:49 AM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org>
>> wrote:
>>
>>
>> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>>
>> I don't think this is a good idea. The modes apply to both savepoints and
>> checkpoints, plus it's much longer to type in. (minor)
>>
>> - add an explicit option to preserve the current behavior (no claim
>> and no duplicate)?
>>
>> We had an offline discussion about it and so far we were leaning towards
>> keeping the set of supported options minimal. However, if we really think
>> the old behaviour is useful we can add a --legacy restore mode. cc
>> @Konstantin @Piotr
>>
>> There seems to be a consensus in the discussion, however, I couldn't
>> find stop-with-savepoint in the document.
>>
>> Sorry, I forgot, about this one. I added a note that savepoints generated
>> from stop-with-savepoint should commit side effects.
>>
>> And I still think it would be nice to list object stores which support
>> duplicate operation.
>>
>> I listed a couple of file systems that do have some sort of a COPY API.
>>
>> Best,
>>
>> Dawid
>> On 26/11/2021 11:03, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for updating the FLIP Dawid
>>
>> There seems to be a consensus in the discussion, however, I couldn't
>> find stop-with-savepoint in the document.
>>
>> A few minor things:
>> - maybe include "checkpoint" in mode names, i.e. --no-claim-checkpoint?
>> - add an explicit option to preserve the current behavior (no claim
>> and no duplicate)?
>> And I still think it would be nice to list object stores which support
>> duplicate operation.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Nov 26, 2021 at 10:37 AM Konstantin Knauf <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> wrote:
>>
>> Hi Dawid,
>>
>> sounds good, specifically 2., too.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Nov 26, 2021 at 9:25 AM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org>
>> wrote:
>>
>>
>> Hi all,
>>
>> I updated the FLIP with a few clarifications:
>>
>>     1. I added a description how would we trigger a "full snapshot" in the
>>     changelog state backend
>>        - (We would go for this option in the 1st version). Trigger a
>>        snapshot of the base state backend in the 1st checkpoint, which induces
>>        materializing the changelog. In this approach we could duplicate SST files,
>>        but we would not duplicate the diff files.
>>     - Add a hook for logic for computing which task should duplicate the
>>        diff files. We would have to do a pass over all states after the state
>>        assignment in StateAssignmentOperation
>>        2. I clarified that the "no-claim" mode requires a
>>     completed/successful checkpoint before we can remove the one we are
>>     restoring from. Also added a note that we can assume a checkpoint is
>>     completed if it is confirmed by Flink's API for checkpointing stats or by
>>     checking an entry in HA services. A checkpoint can not be assumed completed
>>     by just looking at the checkpoint files.
>>
>> I suggest going on with the proposal for "no-claim" as suggested so far,
>> as it is easier to understand by users. They can reliably tell when they
>> can expect the checkpoint to be deletable. If we see that the time to take
>> the 1st checkpoint becomes a problem we can extend the set of restore
>> methods and e.g. add a "claim-temporarily" method.
>>
>> I hope we can reach a consensus and start a vote, some time early next
>> week.
>>
>> Best,
>>
>> Dawid
>>
>> On 23/11/2021 22:39, Roman Khachatryan wrote:
>>
>> I also referred to the "no-claim" mode and I still think neither of them works in that mode, as you'd have to keep lineage of checkpoints externally to be able delete any checkpoint.
>>
>> I think the lineage is needed in all approaches with arbitrary
>> histories; the difference is whether a running Flink is required or
>> not. Is that what you mean?
>> (If not, could you please explain how the scenario you mentioned above
>> with multiple jobs branching from the same checkpoint is handled?)
>>
>>
>> BTW, the state key for RocksDB is actually: backend UID + key group range + SST file name, so the key would be different (the key group range is different for two tasks) and we would've two separate counters for the same file.
>>
>> You're right. But there is also a collision between old and new entries.
>>
>>
>> To be on the same page here. It is not a problem so far in RocksDB, because we do not reuse any shared files in case of rescaling.
>>
>> As I mentioned above, collision happens not only because of rescaling;
>> and AFAIK, there are some ideas to reuse files on rescaling (probably
>> Yuan could clarify). Anyways, I think it makes sense to not bake in
>> this assumption unless it's hard to implement (or at least state it
>> explicitly in FLIP).
>>
>>
>> It is not suggested as an optimization. It is suggested as a must for state backends that need it. I did not elaborate on it, because it could affected only the changelog state backend at the moment, which I don't have much insights. I agree it might make sense to look a bit how we could force full snapshots in the changelog state backend. I will spend some extra time on that.
>>
>> I see. For the Changelog state backend, the easiest way would be to
>> obtain a full snapshot from the underlying backend in snapshot(),
>> ignoring all non-materialized changes. This will effectively
>> materialize all the changes, so only new non-materialized state will
>> be used in subsequent checkpoints.
>>
>>
>> Only the task that gets assigned [1,16] would be responsible for duplicating files of the old range [1, 64].
>>
>> Wouldn't it be likely that the same TM will be responsible for [1, 64]
>> "windowState", [1, 64] "timerState", and so on, for all operators in
>> the chain, and probably other chains? (that what I mean by skew)
>> If we want to address this, preserving handle immutability then we'll
>> probably have to rebuild the whole task state snapshot.
>> (depending on how we approach RocksDB re-uploading, it might not be
>> relevant though)
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Nov 23, 2021 at 4:06 PM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> wrote:
>>
>> I think I know where the confusion comes from regarding arbitrarily
>> recovery histories: Both my counter-proposals were for "no-claim"
>> mode; I didn't mean to replace "claim" mode with them.
>> However, as Yun pointed out, it's impossible to guarantee that all the
>> files will be compacted in a finite number of checkpoints; so let's
>> withdraw those proposals.
>>
>> I also referred to the "no-claim" mode and I still think neither of them works in that mode, as you'd have to keep lineage of checkpoints externally to be able delete any checkpoint.
>>
>> Let's consider a job running with DoP=1; it created checkpoint C1 with
>> a single file F1 and then stopped.
>> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
>> will receive the same file F1.
>>
>> To be on the same page here. It is not a problem so far in RocksDB, because we do not reuse any shared files in case of rescaling. If we want to change how rescaling in RocksDB works then yes, we would have to consider how we want to make sure we copy/duplicate just once. However we would have to first change a crucial thing about regular incremental checkpoints and reorganize the SharedStateRegistry along the way.
>>
>> BTW, the state key for RocksDB is actually: backend UID + key group range + SST file name, so the key would be different (the key group range is different for two tasks) and we would've two separate counters for the same file.
>>
>> Of course, correct me if I am wrong in the two paragraphs above.
>>
>> Re-upload from one task (proposed in FLIP as optimization)
>>
>> It is not suggested as an optimization. It is suggested as a must for state backends that need it. I did not elaborate on it, because it could affected only the changelog state backend at the moment, which I don't have much insights. I agree it might make sense to look a bit how we could force full snapshots in the changelog state backend. I will spend some extra time on that.
>>
>> Lastly I might be wrong, but I think the KeyedStateHandle#getIntersection is a good candidate to distribute the task of duplicating shared files pretty evenly. The idea was that we could mark specially the handles that are assigned the "start of the old key group range". Therefore if a file belonged to a handle responsible for a key group range: [1,64], which is later on split into [1, 16], [17, 32], [33, 48]. [49, 64]. Only the task that gets assigned [1,16] would be responsible for duplicating files of the old range [1, 64].
>>
>> Best,
>>
>> Dawid
>>
>> On 23/11/2021 14:27, Khachatryan Roman wrote:
>>
>> Thanks Dawid, Yun and Piotr,
>>
>> I think I know where the confusion comes from regarding arbitrarily
>> recovery histories: Both my counter-proposals were for "no-claim"
>> mode; I didn't mean to replace "claim" mode with them.
>> However, as Yun pointed out, it's impossible to guarantee that all the
>> files will be compacted in a finite number of checkpoints; so let's
>> withdraw those proposals.
>>
>> And as there are no other alternatives left, the changes to
>> SharedStateRegistry or State Backends are not a decisive factor
>> anymore.
>>
>> However, it probably still makes sense to clarify the details of how
>> re-upload will work in case of rescaling.
>>
>> Let's consider a job running with DoP=1; it created checkpoint C1 with
>> a single file F1 and then stopped.
>> We start a new job from C1 in no-claim mode with DoP=2; so two tasks
>> will receive the same file F1.
>>
>> Let's say both tasks will re-use F1, so it needs to be re-uploaded.
>> Now, we have a choice:
>> 1. Re-upload from both tasks
>> For RocksDB, the state key is: backend UID + SST file name. Both are
>> the same for two tasks, so the key will be the same.
>> Currently, SharedStateRegistry will reject both as duplicates.
>>
>> We can't just replace (to not lose one of the files), so we have to
>> use random keys.
>> However, when we further downscale:
>> - we'll have a conflict on recovery (multiple SST files with the same name)
>> - we'll re-upload the same file multiple times unnecessarily
>> So we have to de-duplicate state on recovery - ideally before sending
>> state snapshots to tasks.
>>
>> 2. Re-upload from one task (proposed in FLIP as optimization)
>> Both tasks must learn the new key. Otherwise, the snapshot of the
>> not-reuploading task will refer to a non-existing entry.
>> We can either re-use the old key (and allow replacement in
>> SharedStateRegistry); or generate the key on JM before sending task
>> state snapshots.
>>
>>
>> P.S.:
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> This is effectively what we have right now, but with an extra (Async?)
>>
>> Right now, there is absolutely no way to find out when the shared
>> state can be deleted; it can't be inferred from which checkpoints are
>> subsumed, and which are not, as future checkpoints might still be
>> using that state.
>>
>> Regards,
>> Roman
>>
>>
>>
>> On Tue, Nov 23, 2021 at 1:37 PM Piotr Nowojski <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> <pn...@apache.org> wrote:
>>
>> Hi,
>>
>> I'm not entirely sure if I fully understand the raised concerns here. So
>> let me maybe step back in the discussion a bit and address the original
>> points from Roman.
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim
>>
>> mode?
>>
>> I second the concerns from Dawid. This is effectively what we have right
>> now, but with an extra (Async?) API call. It's not conceptually simple,
>> it's hard to explain to the users, it might take actually forever to
>> release the artefacts. Furthermore I don't think the implementation would
>> be trivial.
>>
>> On the other hand the current proposal of having (a) `--claim` and (b)
>> `--no-claim` mode are conceptually very simple. (a) being perfectly
>> efficient, without any overheads. If you have concerns that (b) will cause
>> some overheads, slower first checkpoint etc, keep in mind that the user can
>> always pick option (a). Starting a new job from an existing
>> savepoint/externalised checkpoint in general shouldn't be time critical, so
>> users can always even manually copy the files and still use option (a), or
>> just be fine accepting the price of a slower first checkpoint. For other
>> use cases - restarting the same job after a downtime - (b) sounds to me to
>> be an acceptable option.
>>
>> I would also like to point out that the "force full snapshot"/"do not use
>> previous artefacts" option we will need either way for the incremental
>> intermediate savepoints (subject of a next FLIP). From this perspective, we
>> are getting the "--no-claim" option basically for free.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but after
>>
>> a configured number of checkpoints?
>>
>> I don't see a reason why we couldn't provide an option like that at some
>> point in the future. However as it's more complicated to reason about, more
>> complicated to implement and I'm not entirely sure how much actually needed
>> given the (a) `--claim` mode, I think we can wait for feedback from the
>> users before actually implementing it.
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> What do you mean by "rebuilding Task snapshot"?
>>
>> During some early discussions about this point, I've hoped that a state
>> backend like changelog could embed into the state handle information which
>> operator should actually be responsible for duplicating such shared states.
>> However now that I'm thinking about it, indeed there might be an issue if
>> we combine the fact that state handles can be shared across multiple
>> different operators and with a job modification, like dropping an operator.
>> In that case it looks like we would need some extra logic during recovery,
>> that would have an overview of the whole job to make a decision which
>> particular parallel instance of an operator should be responsible for
>> duplicating the underlying file?
>>
>> Best,
>> Piotrek
>>
>> wt., 23 lis 2021 o 12:28 Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org>
>> napisał(a):
>>
>> Some user has usage to restore different jobs based on a same externalized
>> checkpoint. I think this usage would break after introducing this FLIP, and
>> we must tell users explicitly if choose to make Flink manage the
>> checkpoints by default.
>>
>> Could you elaborate what do you mean? The proposal is to use the
>> "no-claim" mode by default which should let users start as many jobs as
>> they wish from the same externalized checkpoints and it should not cause
>> them any harm. Each job effectively will create effectively it's own
>> private "copy" of the initial checkpoint along with the 1st taken
>> checkpoint.
>>
>> If the 1st full checkpoint did not complete in the end, the next
>> checkpoints have to try to reupload all artifacts again. I think this
>> problem could be mitigated if task knows some files have been uploaded
>> before.
>>
>> I don't know how we could achieve that easily. Besides, we have the same
>> situation for all, even regular checkpoints don't we? Do we check if e.g.
>> diff files has been successfully uploaded in a previous aborted checkpoint?
>> I am not saying it's a wrong suggestion, just that I feel it is orthogonal
>> and I can't see a straightforward way to implement it.
>>
>> Best,
>>
>> Dawid
>> On 23/11/2021 07:52, Yun Tang wrote:
>>
>> Hi,
>>
>> For the likelihood of never deleting some SST files by RocksDB. Unfortunately, it could happen as current level compaction strategy in RocksDB is triggered by upper input level size reached to the threshold and the compaction priority cannot guarantee all files would be choosed during several round compactions.
>>
>> Actually, I am a bit in favor of this FLIP to manage checkpoints within Flink as we have heared from many users that they cannot delete older checkpoints after several rounds of re-launching Flink jobs. Current Flink would not delete older checkpoints automatically when restoring from older retained checkpoint, which makes the base checkpoint directory becomes larger and larger. However, if they decide to delete the older checkpoint directory of other job-ids, they might not be able to recover from the last completed checkpoint as it might depend on some artifacts in older checkpoint directory.
>>
>> And I think reuploading would indeed increase the 1st checkpoint duration after restoring. For aliyun oss, the developer said that copping files (larger than 32MB) from one location to another within same bucket on DFS could cause hundreds millseconds. However, from my experiences, copying on HDFS might not be so quick. Maybe some numbers here could be better.
>>
>> I have two questions here:
>> 1. If the 1st full checkpoint did not complete in the end, the next checkpoints have to try to reupload all artifacts again. I think this problem could be mitigated if task knows some files have been uploaded before.
>> 2. Some user has usage to restore different jobs based on a same externalized checkpoint. I think this usage would break after introducing this FLIP, and we must tell users explicitly if choose to make Flink manage the checkpoints by default.
>>
>> Best
>> Yun Tang
>>
>>
>> On 2021/11/22 19:49:11 Dawid Wysakowicz wrote:
>>
>>      There is one more fundamental issue with either of your two
>>      proposals that've just came to my mind.
>>      What happens if you have externalized checkpoints and the job fails
>>      before the initial checkpoint can be safely removed?
>>
>>      You start the job from the latest created checkpoint and wait for it
>>      to be allowed for deletion. Then you can delete it, and all previous
>>      checkpoints (or am I missing something?)
>>
>>
>> Let me clarify it with an example. You start with chk-42, Flink takes
>> e.g. three checkpoints chk-43, chk-44, chk-45 all still reference chk-42
>> files. After that it fails. We have externalized checkpoints enabled,
>> therefore we have retained all checkpoints. Users starts a new program
>> from let's say chk-45. At this point your proposal does not give the
>> user any help in regards when chk-42 can be safely removed. (This is
>> also how Flink works right now).
>>
>> To make it even harder you can arbitrarily complicate it, 1) start a job
>> from chk-44, 2) start a job from a chk-47 which depends on chk-45, 3)
>> never start a job from chk-44, it is not claimed by any job, thus it is
>> never deleted, users must remember themselves that chk-44 originated
>> from chk-42 etc.) User would be forced to build a lineage system for
>> checkpoints to track which checkpoints depend on each other.
>>
>>      I mean that the design described by FLIP implies the following (PCIIW):
>>      1. treat SST files from the initial checkpoint specially: re-upload or
>>      send placeholder - depending on those attributes in state handle
>>      2. (SST files from newer checkpoints are re-uploaded depending on
>>      confirmation currently; so yes there is tracking, but it's different)
>>      3. SharedStateRegistry must allow replacing state under the existing
>>      key; otherwise, if a new key is used then other parallel subtasks
>>      should learn somehow this key and use it; However, allowing
>>      replacement must be limited to this scenario, otherwise it can lead to
>>      previous checkpoint corruption in normal cases
>>
>> I might not understand your points, but I don't think FLIP implies any
>> of this. The FLIP suggests to send along with the CheckpointBarrier a
>> flag "force full checkpoint". Then the state backend should respect it
>> and should not use any of the previous shared handles. Now let me
>> explain how that would work for RocksDB incremental checkpoints.
>>
>>   1. Simplest approach: upload all local RocksDB files. This works
>>      exactly the same as the first incremental checkpoint for a fresh start.
>>   2. Improvement on 1) we already do know which files were uploaded for
>>      the initial checkpoint. Therefore instead of uploading the local
>>      files that are same with files uploaded for the initial checkpoint
>>      we call duplicate for those files and upload just the diff.
>>
>> It does not require any changes to the SharedStateRegistry nor to state
>> handles, at least for RocksDB.
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 22/11/2021 19:33, Roman Khachatryan wrote:
>>
>> If you assume the 1st checkpoint needs to be "full" you know you are not allowed to use any shared files.
>> It's true you should know about the shared files of the previous checkpoint, but e.g. RocksDB already tracks that.
>>
>> I mean that the design described by FLIP implies the following (PCIIW):
>> 1. treat SST files from the initial checkpoint specially: re-upload or
>> send placeholder - depending on those attributes in state handle
>> 2. (SST files from newer checkpoints are re-uploaded depending on
>> confirmation currently; so yes there is tracking, but it's different)
>> 3. SharedStateRegistry must allow replacing state under the existing
>> key; otherwise, if a new key is used then other parallel subtasks
>> should learn somehow this key and use it; However, allowing
>> replacement must be limited to this scenario, otherwise it can lead to
>> previous checkpoint corruption in normal cases
>>
>> Forcing a full checkpoint after completing N checkpoints instead of
>> immediately would only require enabling (1) after N checkpoints.
>> And with the "poll API until checkpoint released" approach, those
>> changes aren't necessary.
>>
>>
>> There is one more fundamental issue with either of your two proposals that've just came to my mind.
>> What happens if you have externalized checkpoints and the job fails before the initial checkpoint can be safely removed?
>>
>> You start the job from the latest created checkpoint and wait for it
>> to be allowed for deletion. Then you can delete it, and all previous
>> checkpoints (or am I missing something?)
>>
>>
>> With tracking the shared files on JM you can not say if you can clear the files after couple of checkpoints or 10s, 100s or 1000s,
>> which translates into minutes/hours/days/weeks of processing.
>>
>> This doesn't necessarily translate into higher cost (because of saved
>> RPC etc., as I mentioned above).
>> However, I do agree that an infinite or arbitrary high delay is unacceptable.
>>
>> The added complexity above doesn't seem negligible to me (especially
>> in SharedStateHandle); and should therefore be weighted against those
>> operational disadvantages (given that the number of checkpoints to
>> wait is bounded in practice).
>>
>> Regards,
>> Roman
>>
>>
>>
>>
>> On Mon, Nov 22, 2021 at 5:05 PM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> wrote:
>>
>> There is one more fundamental issue with either of your two proposals
>> that've just came to my mind. What happens if you have externalized
>> checkpoints and the job fails before the initial checkpoint can be
>> safely removed? You have a situation where you have a retained
>> checkpoint that was built on top of the original one. Basically ending
>> in a situation we have right now that you never know when it is safe to
>> delete a retained checkpoint.
>>
>> BTW, the intention for the "claim" mode was to support cases when users
>> are concerned with the performance of the first checkpoint. In those
>> cases they can claim the checkpoint on don't pay the additional cost of
>> the first checkpoint.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 14:09, Roman Khachatryan wrote:
>>
>> Thanks Dawid,
>>
>> Regarding clarity,
>> I think that all proposals require waiting for some event: re-upload /
>> checkpoint completion / api response.
>> But with the current one, there is an assumption: "initial checkpoint
>> can be deleted once a new one completes" (instead of just "initial
>> checkpoint can be deleted once the API says it can be deleted").
>> So I think it's actually more clear to offer this explicit API and rely on it.
>>
>> Regarding delaying the deletion,
>> I agree that it can delay deletion, but how important is it?
>> Checkpoints are usually stored on relatively cheap storage like S3, so
>> some delay shouldn't be an issue (especially taking rounding into
>> account); it can even be cheaper or comparable to paying for
>> re-upload/duplicate calls.
>>
>> Infinite delay can be an issue though, I agree.
>> Maybe @Yun can clarify the likelihood of never deleting some SST files
>> by RocksDB?
>> For the changelog backend, old files won't be used once
>> materialization succeeds.
>>
>> Yes, my concern is checkpointing time, but also added complexity:
>>
>> It would be a bit invasive though, as we would have to somehow keep track which files should not be reused on TMs.
>>
>> I think we need this anyway if we choose to re-upload files once the
>> job is running.
>> The new checkpoint must be formed by re-uploaded old artifacts AND
>> uploaded new artifacts.
>>
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Nov 22, 2021 at 12:42 PM Dawid Wysakowicz<dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> wrote:
>>
>> @Yun
>>
>> I think it is a good comment with I agree in principal. However, we use --fromSavepoint (cli), savepointPath (REST API), SavepointRestoreSettings for both restoring from a savepoint and an externalized checkpoint already. I wanted to voice that concern. Nevertheless I am fine with changing it to execution.restore-mode, if there are no other comments on that matter, I will change it.
>>
>> @Roman:
>>
>> Re 1. Correct, stop-with-savepoint should commit side-effects. Will add that to the doc.
>>
>> Re.2 What I don't like about this counter proposal is that it still has no clearly defined point in time when it is safe to delete the original checkpoint. Users would have a hard time reasoning about it and debugging. Even worse, I think worst case it might never happen that all the original files are no longer in use (I am not too familiar with RocksDB compaction, but what happens if there are key ranges that are never accessed again?) I agree it is unlikely, but possible, isn't it? Definitely it can take a significant time and many checkpoints to do so.
>>
>> Re. 3 I believe where you are coming from is that you'd like to keep the checkpointing time minimal and reuploading files may increase it. The proposal so far builds on the assumption we could in most cases use a cheap duplicate API instead of re-upload. I could see this as a follow-up if it becomes a bottleneck. It would be a bit invasive though, as we would have to somehow keep track which files should not be reused on TMs.
>>
>> Re. 2 & 3 Neither of the counter proposals work well for taking incremental savepoints. We were thinking of building incremental savepoints on the same concept. I think delaying the completion of an independent savepoint to a closer undefined future is not a nice property of savepoints.
>>
>> Re 4. Good point. We should make sure the first completed checkpoint has the independent/full checkpoint property rather than just the first triggered.
>>
>> Re. 5 & 6 I need a bit more time to look into it.
>>
>> Best,
>>
>> Dawid
>>
>> On 22/11/2021 11:40, Roman Khachatryan wrote:
>>
>> Hi,
>>
>> Thanks for the proposal Dawid, I have some questions and remarks:
>>
>> 1. How will stop-with-savepoint be handled?
>> Shouldn't side effects be enforced in this case? (i.e. send
>> notifyCheckpointComplete)
>>
>> 2. Instead of forcing re-upload, can we "inverse control" in no-claim mode?
>> Anyways, any external tool will have to poll Flink API waiting for the
>> next (full) checkpoint, before deleting the retained checkpoint,
>> right?
>> Instead, we can provide an API which tells whether the 1st checkpoint
>> is still in use (and not force re-upload it).
>>
>> Under the hood, it can work like this:
>> - for the checkpoint Flink recovers from, remember all shared state
>> handles it is adding
>> - when unregistering shared state handles, remove them from the set above
>> - when the set becomes empty the 1st checkpoint can be deleted externally
>>
>> Besides not requiring re-upload, it seems much simpler and less invasive.
>> On the downside, state deletion can be delayed; but I think this is a
>> reasonable trade-off.
>>
>> 3. Alternatively, re-upload not necessarily on 1st checkpoint, but
>> after a configured number of checkpoints?
>> There is a high chance that after some more checkpoints, initial state
>> will not be used (because of compaction),
>> so backends won't have to re-upload anything (or small part).
>>
>> 4. Re-uploaded artifacts must not be deleted on checkpoin abortion
>> This should be addressed in https://issues.apache.org/jira/browse/FLINK-24611.
>> If not, I think the FLIP should consider this case.
>>
>> 5. Enforcing re-upload by a single task and Changelog state backend
>> With Changelog state backend, a file can be shared by multiple operators.
>> Therefore, getIntersection() is irrelevant here, because operators
>> might not be sharing any key groups.
>> (so we'll have to analyze "raw" file usage I think).
>>
>> 6. Enforcing re-upload by a single task and skew
>> If we use some greedy logic like subtask 0 always re-uploads then it
>> might be overloaded.
>> So we'll have to obtain a full list of subtasks first (then probably
>> choose randomly or round-robin).
>> However, that requires rebuilding Task snapshot, which is doable but
>> not trivial (which I think supports "reverse API option").
>>
>> 7. I think it would be helpful to list file systems / object stores
>> that support "fast" copy (ideally with latency numbers).
>>
>> Regards,
>> Roman
>>
>> On Mon, Nov 22, 2021 at 9:24 AM Yun Gao <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> <yu...@aliyun.com.invalid> wrote:
>>
>> Hi,
>>
>> Very thanks Dawid for proposing the FLIP to clarify the ownership for the
>> states. +1 for the overall changes since it makes the behavior clear and
>> provide users a determined method to finally cleanup savepoints / retained checkpoints.
>>
>> Regarding the changes to the public interface, it seems currently the changes are all bound
>> to the savepoint, but from the FLIP it seems perhaps we might also need to support the claim declaration
>> for retained checkpoints like in the cli side[1] ? If so, then might it be better to change the option name
>> from `execution.savepoint.restore-mode` to something like `execution.restore-mode`?
>>
>> Best,
>> Yun
>>
>>
>> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>>
>>
>> ------------------------------------------------------------------
>> From:Konstantin Knauf <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org> <kn...@apache.org>
>> Send Time:2021 Nov. 19 (Fri.) 16:00
>> To:dev <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org> <de...@flink.apache.org>
>> Subject:Re: [DISCUSS] FLIP-193: Snapshots ownership
>>
>> Hi Dawid,
>>
>> Thanks for working on this FLIP. Clarifying the differences and
>> guarantees around savepoints and checkpoints will make it easier and safer
>> for users and downstream projects and platforms to work with them.
>>
>> +1 to the changing the current (undefined) behavior when recovering from
>> retained checkpoints. Users can now choose between claiming and not
>> claiming, which I think will make the current mixed behavior obsolete.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Nov 19, 2021 at 8:19 AM Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org> <dw...@apache.org>
>> wrote:
>>
>> Hi devs,
>>
>> I'd like to bring up for a discussion a proposal to clean up ownership
>> of snapshots, both checkpoints and savepoints.
>>
>> The goal here is to make it clear who is responsible for deleting
>> checkpoints/savepoints files and when can that be done in a safe manner.
>>
>> Looking forward for your feedback!
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://cwiki.apache.org/confluence/x/bIyqCw
>>
>>
>>
>> --
>>
>> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>>
>>   --
>>
>> Konstantin Knaufhttps://twitter.com/snntrablehttps://github.com/knaufk
>>
>>