You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Saurabh Kaul <sa...@gmail.com> on 2022/10/20 05:53:08 UTC

[DISCUSS] REST API to suspend & resume checkpointing

Hey everyone,

I will create a FLIP, but wanted to gauge community opinion first. The
motivation is that production Flink applications frequently need to go
through node/image patching to update the software and AMI with latest
security fixes. These patching related restarts do not involve application
jar or parallelism updates and can therefore be done without costly
savepoint completion and restore cycles by relying on the last checkpoint
state in order to achieve minimum downtime. In order to achieve this, we
currently rely on retained checkpoints and the following steps:

   - Create new stand-by Flink cluster and submit application jar
   - Delete Flink TM deployment to stop processing & checkpoints on old
   cluster(reduce duplicates)
   - Query last completed checkpoint from REST API on JM of old cluster
   - Submit new job using last available checkpoint in new cluster, delete
   old cluster

We have observed that this process will sometimes not select the latest
checkpoint as partially completed checkpoints race and finish after
querying the JM. Alternatives are to rely on creating other sources for
checkpoint info but this has complications, as discussed in [2]. Waiting
and force deleting task managers increases downtimes and doesn't guarantee
TM process termination respectively. In order to maintain low downtime,
duplicates and solve this race we can introduce an API to suspend
checkpointing. Querying the latest available checkpoint after having
suspending checkpointing will guarantee that we can maintain exactly once
in such a scenario.

This also acts as an extension to [1] where the feature to trigger
checkpoints through a control plane has been discussed and added. It makes
the checkpointing process flexible and gives the user more control in
scenarios like migrating applications and letting data processing catch up
temporarily.
We can implement this similar to [1] and expose a trigger to suspend and
resume checkpointing via CLI and REST API. We can add a parameter to
suspend in 2 ways.

   1. Suspend scheduled checkpoint trigger, doesn’t cancel any still in
   progress checkpoints/savepoints but stops only future ones
   2. Suspend checkpoint coordinator, cancels in progress
   checkpoints/savepoints. Guarantees no racing checkpoint completion and
   could be used for canceling stuck checkpoints and help data processing

[1] https://issues.apache.org/jira/browse/FLINK-27101
[2] https://issues.apache.org/jira/browse/FLINK-26916

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Jing Ge <ji...@ververica.com>.
Hi,

we should consider it very carefully if we should build something like
stop-with-checkpoint at all. Semantically and conceptually, Checkpoint
should be more and more internally managed by Flink[1] and users should
use it very sparingly from the developmernt's perspective. Savepoint is the
right one for users to do things like job migration or upgrade.

Best regards,
Jing

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints_vs_savepoints/

On Mon, Oct 24, 2022 at 11:06 AM Piotr Nowojski <pn...@apache.org>
wrote:

> Hi Saurabh and Yun Tang,
>
> I tend to agree with Yun Tang. Exposure of stop-with-checkpoint would
> complicate the system for most of the users a bit too much, with very
> little gain.
>
> > 1. In producing a full snapshot, I see this is noted as follows in the
> Flip
>
> If you want to recover in CLAIM or NO_CLAIM mode, that's an orthogonal
> question to whether you ended up with a savepoint or checkpoint at the end
> of your last job. So that's not an issue here. As Yun Tang wrote, the only
> difference is that stop-with-savepoint (vs a hypothetical
> stop-with-checkpoint), has to copy out files from previous incremental
> checkpoints that are still being referenced. I don't expect this to be a
> big issue for the majority of use cases (only if you have a really huge
> state, and the async phase of the checkpoint/savepoint is taking a
> very long time - and only when assuming that you don't have another issue
> that's causing slow file upload) . And if that's an issue, you still have
> the other work-around for your initial problem: to manually trigger all of
> the checkpoints as you wish via the REST API.
>
> Best,
> Piotrek
>
>
>
>
>
> pon., 24 paź 2022 o 10:13 Yun Tang <my...@live.com> napisał(a):
>
> > Hi Saurabh,
> >
> > From the scope of implementation, I think stopping with native savepoint
> > is very close to stopping with checkpoint. The only different part is the
> > fast duplication over distributed file systems, which could be mitigated
> > via distributed file system shallow copy. Thus, I don't think we should
> > introduce another concept called stopping with checkpoint to make the
> total
> > system more complex.
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Saurabh Kaul <sa...@gmail.com>
> > Sent: Saturday, October 22, 2022 2:45
> > To: dev@flink.apache.org <de...@flink.apache.org>
> > Subject: Re: [DISCUSS] REST API to suspend & resume checkpointing
> >
> > Hi Piotr,
> >
> > Sorry for the confusion. I'm referring to latency concerns at 2 points -
> >
> > 1. In producing a full snapshot, I see this is noted as follows in the
> Flip
> > [1]:
> > > This means that if the underlying FileSystem doesn’t support fast
> > duplication, incremental savepoints will most likely be still slower
> > compared to incremental checkpoints.
> > In our use case for example, we use S3 file system so we have some
> > additional overhead of duplication.
> > 2. I mixed up the savepoint recovery modes, I think the default NO_CLAIM
> > mode would be more appropriate for production use generally since CLAIM
> > mode carries the risk of accidental savepoint deletion when future
> > checkpoints can build on top of it. Manual checkpoint deletion is not a
> > concern. NO_CLAIM would require the first successful checkpoint to be a
> > full checkpoint.
> >
> > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203
> >
> > On Fri, Oct 21, 2022 at 1:31 AM Piotr Nowojski <pn...@apache.org>
> > wrote:
> >
> > > Hi Saurabh,
> > >
> > > > 1. Stop with native savepoint does solve any races and produces a
> > > > predictable restoration point, but producing a self-contained
> snapshot
> > > and
> > > > using CLAIM mode in re-running is not necessary here and adds
> latency.
> > >
> > > Why do you think it adds latency? CLAIM mode is actually the fastest
> one
> > > with zero additional overheads. Native savepoints are almost an
> > equivalent
> > > of checkpoints.
> > >
> > > Best,
> > > Piotrek
> > >
> > >
> > > pt., 21 paź 2022 o 00:51 Saurabh Kaul <sa...@gmail.com> napisał(a):
> > >
> > > > Hi, thanks for the quick responses,
> > > >
> > > > I think a stop-with-checkpoint idea is overlapping well with the
> > > > requirements.
> > > >
> > > > 1. Stop with native savepoint does solve any races and produces a
> > > > predictable restoration point, but producing a self-contained
> snapshot
> > > and
> > > > using CLAIM mode in re-running is not necessary here and adds
> latency.
> > > > Stop-with-checkpoint doesn't have these issues. It adds some downtime
> > in
> > > > waiting for a checkpoint to be completed but reduces replay time in
> the
> > > new
> > > > cluster which is a good trade-off. Since in this scenario of job
> > > migration
> > > > the job and/or job configuration is not changing; it should ideally
> be
> > as
> > > > fast as a regular failover scenario (like a TM going down).
> > > > 2. Taking complete ownership of triggering checkpoints and making
> them
> > > more
> > > > configurable could be feasible but are less effective comparatively
> in
> > > > terms of stopping the job for the primary purpose of low-downtime
> > > migration
> > > > of the job. Stop-with-checkpoint solves it more directly.
> > > >
> > > > Looking forward to hearing thoughts on this.
> > > >
> > > > On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pnowojski@apache.org
> >
> > > > wrote:
> > > >
> > > > > Hi Saurabh,
> > > > >
> > > > > Thanks for reaching out with the proposal. I have some mixed
> feelings
> > > > about
> > > > > this for a couple of reasons:
> > > > >
> > > > > 1. It sounds like the core problem that you are describing is the
> > race
> > > > > condition between shutting down the cluster and completion of new
> > > > > checkpoints. My first thought would be as Jing's, why don't you use
> > > > > stop-with-savepoint? Especially the native savepoint? You can
> recover
> > > > from
> > > > > it using --claim mode, so the whole process should be quite fast
> > > > actually.
> > > > > 2. The same issue, not knowing the latest completed checkpoint id,
> > > > plagued
> > > > > us with some internal tests for quite a bit, so maybe this would
> also
> > > be
> > > > > worth considering to address instead? Like leaving in some text
> file
> > > the
> > > > > last completed checkpoint id? Or providing a way to read this from
> > some
> > > > > existing metadata files? However in our tests we actually
> > fixed/worked
> > > > > around that with manually triggering of checkpoints. The
> predecessor
> > of
> > > > > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this
> > exact
> > > > > issue.  Which brings me to...
> > > > > 3. You could actually just use the REST API to trigger all
> > checkpoints
> > > > > manually. The idea behind FLINK-27101 [1] was to add full
> flexibility
> > > to
> > > > > the users, without adding much complexity to the system. If we
> start
> > > > adding
> > > > > more REST calls to control checkpointing behaviour it would
> > complicate
> > > > the
> > > > > system.
> > > > > 4. If at all, I would think more towards a more generic idea of
> > > > dynamically
> > > > > reconfiguring the system. We could provide a generic way to
> > dynamically
> > > > > change configuration options. We wouldn't be able to support all
> > > > > configurations, and furthermore, each "dynamic" option would have
> to
> > be
> > > > > handled/passed down to and through the system differently, BUT we
> > > > wouldn't
> > > > > have to do all of that at once. We could start with a very limited
> > set
> > > of
> > > > > dynamic options, for example just with the checkpointing interval.
> > This
> > > > > must have been considered/discussed before, so I might be missing
> > lots
> > > of
> > > > > things.
> > > > > 5. Another direction, if 1. is not an option for some reason, is to
> > > > provide
> > > > > a stop-with-checkpoint feature?
> > > > >
> > > > > Best Piotrek
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-24280
> > > > >
> > > > > czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):
> > > > >
> > > > > > Hi Saurabh,
> > > > > >
> > > > > > In general, it is always good to add new features. I am not
> really
> > > sure
> > > > > if
> > > > > > I understood your requirement. I guess it will be too long for
> you
> > to
> > > > > > resume the job with a created savepoint in the new stand-by Flink
> > > > > cluster.
> > > > > > But if it would be acceptable to you, you should not have the
> issue
> > > you
> > > > > > mentioned with the checkpoint. Speaking of checkpoint, if the
> > > > checkpoint
> > > > > > interval were set properly, it should be fine even if in some
> rare
> > > > cases
> > > > > > the last checkpoint was partially completed and is not selected.
> > > > Another
> > > > > > option could be to trigger a manual checkpoint and then use that
> > one
> > > to
> > > > > > resume the job to maintain the low downtime.
> > > > > >
> > > > > > Best regards,
> > > > > > JIng
> > > > > >
> > > > > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <saurkaul@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hey everyone,
> > > > > > >
> > > > > > > I will create a FLIP, but wanted to gauge community opinion
> > first.
> > > > The
> > > > > > > motivation is that production Flink applications frequently
> need
> > to
> > > > go
> > > > > > > through node/image patching to update the software and AMI with
> > > > latest
> > > > > > > security fixes. These patching related restarts do not involve
> > > > > > application
> > > > > > > jar or parallelism updates and can therefore be done without
> > costly
> > > > > > > savepoint completion and restore cycles by relying on the last
> > > > > checkpoint
> > > > > > > state in order to achieve minimum downtime. In order to achieve
> > > this,
> > > > > we
> > > > > > > currently rely on retained checkpoints and the following steps:
> > > > > > >
> > > > > > >    - Create new stand-by Flink cluster and submit application
> jar
> > > > > > >    - Delete Flink TM deployment to stop processing &
> checkpoints
> > on
> > > > old
> > > > > > >    cluster(reduce duplicates)
> > > > > > >    - Query last completed checkpoint from REST API on JM of old
> > > > cluster
> > > > > > >    - Submit new job using last available checkpoint in new
> > cluster,
> > > > > > delete
> > > > > > >    old cluster
> > > > > > >
> > > > > > > We have observed that this process will sometimes not select
> the
> > > > latest
> > > > > > > checkpoint as partially completed checkpoints race and finish
> > after
> > > > > > > querying the JM. Alternatives are to rely on creating other
> > sources
> > > > for
> > > > > > > checkpoint info but this has complications, as discussed in
> [2].
> > > > > Waiting
> > > > > > > and force deleting task managers increases downtimes and
> doesn't
> > > > > > guarantee
> > > > > > > TM process termination respectively. In order to maintain low
> > > > downtime,
> > > > > > > duplicates and solve this race we can introduce an API to
> suspend
> > > > > > > checkpointing. Querying the latest available checkpoint after
> > > having
> > > > > > > suspending checkpointing will guarantee that we can maintain
> > > exactly
> > > > > once
> > > > > > > in such a scenario.
> > > > > > >
> > > > > > > This also acts as an extension to [1] where the feature to
> > trigger
> > > > > > > checkpoints through a control plane has been discussed and
> added.
> > > It
> > > > > > makes
> > > > > > > the checkpointing process flexible and gives the user more
> > control
> > > in
> > > > > > > scenarios like migrating applications and letting data
> processing
> > > > catch
> > > > > > up
> > > > > > > temporarily.
> > > > > > > We can implement this similar to [1] and expose a trigger to
> > > suspend
> > > > > and
> > > > > > > resume checkpointing via CLI and REST API. We can add a
> parameter
> > > to
> > > > > > > suspend in 2 ways.
> > > > > > >
> > > > > > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any
> > > still
> > > > in
> > > > > > >    progress checkpoints/savepoints but stops only future ones
> > > > > > >    2. Suspend checkpoint coordinator, cancels in progress
> > > > > > >    checkpoints/savepoints. Guarantees no racing checkpoint
> > > completion
> > > > > and
> > > > > > >    could be used for canceling stuck checkpoints and help data
> > > > > processing
> > > > > > >
> > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Saurabh and Yun Tang,

I tend to agree with Yun Tang. Exposure of stop-with-checkpoint would
complicate the system for most of the users a bit too much, with very
little gain.

> 1. In producing a full snapshot, I see this is noted as follows in the
Flip

If you want to recover in CLAIM or NO_CLAIM mode, that's an orthogonal
question to whether you ended up with a savepoint or checkpoint at the end
of your last job. So that's not an issue here. As Yun Tang wrote, the only
difference is that stop-with-savepoint (vs a hypothetical
stop-with-checkpoint), has to copy out files from previous incremental
checkpoints that are still being referenced. I don't expect this to be a
big issue for the majority of use cases (only if you have a really huge
state, and the async phase of the checkpoint/savepoint is taking a
very long time - and only when assuming that you don't have another issue
that's causing slow file upload) . And if that's an issue, you still have
the other work-around for your initial problem: to manually trigger all of
the checkpoints as you wish via the REST API.

Best,
Piotrek





pon., 24 paź 2022 o 10:13 Yun Tang <my...@live.com> napisał(a):

> Hi Saurabh,
>
> From the scope of implementation, I think stopping with native savepoint
> is very close to stopping with checkpoint. The only different part is the
> fast duplication over distributed file systems, which could be mitigated
> via distributed file system shallow copy. Thus, I don't think we should
> introduce another concept called stopping with checkpoint to make the total
> system more complex.
>
>
> Best
> Yun Tang
> ________________________________
> From: Saurabh Kaul <sa...@gmail.com>
> Sent: Saturday, October 22, 2022 2:45
> To: dev@flink.apache.org <de...@flink.apache.org>
> Subject: Re: [DISCUSS] REST API to suspend & resume checkpointing
>
> Hi Piotr,
>
> Sorry for the confusion. I'm referring to latency concerns at 2 points -
>
> 1. In producing a full snapshot, I see this is noted as follows in the Flip
> [1]:
> > This means that if the underlying FileSystem doesn’t support fast
> duplication, incremental savepoints will most likely be still slower
> compared to incremental checkpoints.
> In our use case for example, we use S3 file system so we have some
> additional overhead of duplication.
> 2. I mixed up the savepoint recovery modes, I think the default NO_CLAIM
> mode would be more appropriate for production use generally since CLAIM
> mode carries the risk of accidental savepoint deletion when future
> checkpoints can build on top of it. Manual checkpoint deletion is not a
> concern. NO_CLAIM would require the first successful checkpoint to be a
> full checkpoint.
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203
>
> On Fri, Oct 21, 2022 at 1:31 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
> > Hi Saurabh,
> >
> > > 1. Stop with native savepoint does solve any races and produces a
> > > predictable restoration point, but producing a self-contained snapshot
> > and
> > > using CLAIM mode in re-running is not necessary here and adds latency.
> >
> > Why do you think it adds latency? CLAIM mode is actually the fastest one
> > with zero additional overheads. Native savepoints are almost an
> equivalent
> > of checkpoints.
> >
> > Best,
> > Piotrek
> >
> >
> > pt., 21 paź 2022 o 00:51 Saurabh Kaul <sa...@gmail.com> napisał(a):
> >
> > > Hi, thanks for the quick responses,
> > >
> > > I think a stop-with-checkpoint idea is overlapping well with the
> > > requirements.
> > >
> > > 1. Stop with native savepoint does solve any races and produces a
> > > predictable restoration point, but producing a self-contained snapshot
> > and
> > > using CLAIM mode in re-running is not necessary here and adds latency.
> > > Stop-with-checkpoint doesn't have these issues. It adds some downtime
> in
> > > waiting for a checkpoint to be completed but reduces replay time in the
> > new
> > > cluster which is a good trade-off. Since in this scenario of job
> > migration
> > > the job and/or job configuration is not changing; it should ideally be
> as
> > > fast as a regular failover scenario (like a TM going down).
> > > 2. Taking complete ownership of triggering checkpoints and making them
> > more
> > > configurable could be feasible but are less effective comparatively in
> > > terms of stopping the job for the primary purpose of low-downtime
> > migration
> > > of the job. Stop-with-checkpoint solves it more directly.
> > >
> > > Looking forward to hearing thoughts on this.
> > >
> > > On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pn...@apache.org>
> > > wrote:
> > >
> > > > Hi Saurabh,
> > > >
> > > > Thanks for reaching out with the proposal. I have some mixed feelings
> > > about
> > > > this for a couple of reasons:
> > > >
> > > > 1. It sounds like the core problem that you are describing is the
> race
> > > > condition between shutting down the cluster and completion of new
> > > > checkpoints. My first thought would be as Jing's, why don't you use
> > > > stop-with-savepoint? Especially the native savepoint? You can recover
> > > from
> > > > it using --claim mode, so the whole process should be quite fast
> > > actually.
> > > > 2. The same issue, not knowing the latest completed checkpoint id,
> > > plagued
> > > > us with some internal tests for quite a bit, so maybe this would also
> > be
> > > > worth considering to address instead? Like leaving in some text file
> > the
> > > > last completed checkpoint id? Or providing a way to read this from
> some
> > > > existing metadata files? However in our tests we actually
> fixed/worked
> > > > around that with manually triggering of checkpoints. The predecessor
> of
> > > > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this
> exact
> > > > issue.  Which brings me to...
> > > > 3. You could actually just use the REST API to trigger all
> checkpoints
> > > > manually. The idea behind FLINK-27101 [1] was to add full flexibility
> > to
> > > > the users, without adding much complexity to the system. If we start
> > > adding
> > > > more REST calls to control checkpointing behaviour it would
> complicate
> > > the
> > > > system.
> > > > 4. If at all, I would think more towards a more generic idea of
> > > dynamically
> > > > reconfiguring the system. We could provide a generic way to
> dynamically
> > > > change configuration options. We wouldn't be able to support all
> > > > configurations, and furthermore, each "dynamic" option would have to
> be
> > > > handled/passed down to and through the system differently, BUT we
> > > wouldn't
> > > > have to do all of that at once. We could start with a very limited
> set
> > of
> > > > dynamic options, for example just with the checkpointing interval.
> This
> > > > must have been considered/discussed before, so I might be missing
> lots
> > of
> > > > things.
> > > > 5. Another direction, if 1. is not an option for some reason, is to
> > > provide
> > > > a stop-with-checkpoint feature?
> > > >
> > > > Best Piotrek
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > [2] https://issues.apache.org/jira/browse/FLINK-24280
> > > >
> > > > czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):
> > > >
> > > > > Hi Saurabh,
> > > > >
> > > > > In general, it is always good to add new features. I am not really
> > sure
> > > > if
> > > > > I understood your requirement. I guess it will be too long for you
> to
> > > > > resume the job with a created savepoint in the new stand-by Flink
> > > > cluster.
> > > > > But if it would be acceptable to you, you should not have the issue
> > you
> > > > > mentioned with the checkpoint. Speaking of checkpoint, if the
> > > checkpoint
> > > > > interval were set properly, it should be fine even if in some rare
> > > cases
> > > > > the last checkpoint was partially completed and is not selected.
> > > Another
> > > > > option could be to trigger a manual checkpoint and then use that
> one
> > to
> > > > > resume the job to maintain the low downtime.
> > > > >
> > > > > Best regards,
> > > > > JIng
> > > > >
> > > > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey everyone,
> > > > > >
> > > > > > I will create a FLIP, but wanted to gauge community opinion
> first.
> > > The
> > > > > > motivation is that production Flink applications frequently need
> to
> > > go
> > > > > > through node/image patching to update the software and AMI with
> > > latest
> > > > > > security fixes. These patching related restarts do not involve
> > > > > application
> > > > > > jar or parallelism updates and can therefore be done without
> costly
> > > > > > savepoint completion and restore cycles by relying on the last
> > > > checkpoint
> > > > > > state in order to achieve minimum downtime. In order to achieve
> > this,
> > > > we
> > > > > > currently rely on retained checkpoints and the following steps:
> > > > > >
> > > > > >    - Create new stand-by Flink cluster and submit application jar
> > > > > >    - Delete Flink TM deployment to stop processing & checkpoints
> on
> > > old
> > > > > >    cluster(reduce duplicates)
> > > > > >    - Query last completed checkpoint from REST API on JM of old
> > > cluster
> > > > > >    - Submit new job using last available checkpoint in new
> cluster,
> > > > > delete
> > > > > >    old cluster
> > > > > >
> > > > > > We have observed that this process will sometimes not select the
> > > latest
> > > > > > checkpoint as partially completed checkpoints race and finish
> after
> > > > > > querying the JM. Alternatives are to rely on creating other
> sources
> > > for
> > > > > > checkpoint info but this has complications, as discussed in [2].
> > > > Waiting
> > > > > > and force deleting task managers increases downtimes and doesn't
> > > > > guarantee
> > > > > > TM process termination respectively. In order to maintain low
> > > downtime,
> > > > > > duplicates and solve this race we can introduce an API to suspend
> > > > > > checkpointing. Querying the latest available checkpoint after
> > having
> > > > > > suspending checkpointing will guarantee that we can maintain
> > exactly
> > > > once
> > > > > > in such a scenario.
> > > > > >
> > > > > > This also acts as an extension to [1] where the feature to
> trigger
> > > > > > checkpoints through a control plane has been discussed and added.
> > It
> > > > > makes
> > > > > > the checkpointing process flexible and gives the user more
> control
> > in
> > > > > > scenarios like migrating applications and letting data processing
> > > catch
> > > > > up
> > > > > > temporarily.
> > > > > > We can implement this similar to [1] and expose a trigger to
> > suspend
> > > > and
> > > > > > resume checkpointing via CLI and REST API. We can add a parameter
> > to
> > > > > > suspend in 2 ways.
> > > > > >
> > > > > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any
> > still
> > > in
> > > > > >    progress checkpoints/savepoints but stops only future ones
> > > > > >    2. Suspend checkpoint coordinator, cancels in progress
> > > > > >    checkpoints/savepoints. Guarantees no racing checkpoint
> > completion
> > > > and
> > > > > >    could be used for canceling stuck checkpoints and help data
> > > > processing
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Yun Tang <my...@live.com>.
Hi Saurabh,

From the scope of implementation, I think stopping with native savepoint is very close to stopping with checkpoint. The only different part is the fast duplication over distributed file systems, which could be mitigated via distributed file system shallow copy. Thus, I don't think we should introduce another concept called stopping with checkpoint to make the total system more complex.


Best
Yun Tang
________________________________
From: Saurabh Kaul <sa...@gmail.com>
Sent: Saturday, October 22, 2022 2:45
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: Re: [DISCUSS] REST API to suspend & resume checkpointing

Hi Piotr,

Sorry for the confusion. I'm referring to latency concerns at 2 points -

1. In producing a full snapshot, I see this is noted as follows in the Flip
[1]:
> This means that if the underlying FileSystem doesn’t support fast
duplication, incremental savepoints will most likely be still slower
compared to incremental checkpoints.
In our use case for example, we use S3 file system so we have some
additional overhead of duplication.
2. I mixed up the savepoint recovery modes, I think the default NO_CLAIM
mode would be more appropriate for production use generally since CLAIM
mode carries the risk of accidental savepoint deletion when future
checkpoints can build on top of it. Manual checkpoint deletion is not a
concern. NO_CLAIM would require the first successful checkpoint to be a
full checkpoint.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203

On Fri, Oct 21, 2022 at 1:31 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Saurabh,
>
> > 1. Stop with native savepoint does solve any races and produces a
> > predictable restoration point, but producing a self-contained snapshot
> and
> > using CLAIM mode in re-running is not necessary here and adds latency.
>
> Why do you think it adds latency? CLAIM mode is actually the fastest one
> with zero additional overheads. Native savepoints are almost an equivalent
> of checkpoints.
>
> Best,
> Piotrek
>
>
> pt., 21 paź 2022 o 00:51 Saurabh Kaul <sa...@gmail.com> napisał(a):
>
> > Hi, thanks for the quick responses,
> >
> > I think a stop-with-checkpoint idea is overlapping well with the
> > requirements.
> >
> > 1. Stop with native savepoint does solve any races and produces a
> > predictable restoration point, but producing a self-contained snapshot
> and
> > using CLAIM mode in re-running is not necessary here and adds latency.
> > Stop-with-checkpoint doesn't have these issues. It adds some downtime in
> > waiting for a checkpoint to be completed but reduces replay time in the
> new
> > cluster which is a good trade-off. Since in this scenario of job
> migration
> > the job and/or job configuration is not changing; it should ideally be as
> > fast as a regular failover scenario (like a TM going down).
> > 2. Taking complete ownership of triggering checkpoints and making them
> more
> > configurable could be feasible but are less effective comparatively in
> > terms of stopping the job for the primary purpose of low-downtime
> migration
> > of the job. Stop-with-checkpoint solves it more directly.
> >
> > Looking forward to hearing thoughts on this.
> >
> > On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pn...@apache.org>
> > wrote:
> >
> > > Hi Saurabh,
> > >
> > > Thanks for reaching out with the proposal. I have some mixed feelings
> > about
> > > this for a couple of reasons:
> > >
> > > 1. It sounds like the core problem that you are describing is the race
> > > condition between shutting down the cluster and completion of new
> > > checkpoints. My first thought would be as Jing's, why don't you use
> > > stop-with-savepoint? Especially the native savepoint? You can recover
> > from
> > > it using --claim mode, so the whole process should be quite fast
> > actually.
> > > 2. The same issue, not knowing the latest completed checkpoint id,
> > plagued
> > > us with some internal tests for quite a bit, so maybe this would also
> be
> > > worth considering to address instead? Like leaving in some text file
> the
> > > last completed checkpoint id? Or providing a way to read this from some
> > > existing metadata files? However in our tests we actually fixed/worked
> > > around that with manually triggering of checkpoints. The predecessor of
> > > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this exact
> > > issue.  Which brings me to...
> > > 3. You could actually just use the REST API to trigger all checkpoints
> > > manually. The idea behind FLINK-27101 [1] was to add full flexibility
> to
> > > the users, without adding much complexity to the system. If we start
> > adding
> > > more REST calls to control checkpointing behaviour it would complicate
> > the
> > > system.
> > > 4. If at all, I would think more towards a more generic idea of
> > dynamically
> > > reconfiguring the system. We could provide a generic way to dynamically
> > > change configuration options. We wouldn't be able to support all
> > > configurations, and furthermore, each "dynamic" option would have to be
> > > handled/passed down to and through the system differently, BUT we
> > wouldn't
> > > have to do all of that at once. We could start with a very limited set
> of
> > > dynamic options, for example just with the checkpointing interval. This
> > > must have been considered/discussed before, so I might be missing lots
> of
> > > things.
> > > 5. Another direction, if 1. is not an option for some reason, is to
> > provide
> > > a stop-with-checkpoint feature?
> > >
> > > Best Piotrek
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > [2] https://issues.apache.org/jira/browse/FLINK-24280
> > >
> > > czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):
> > >
> > > > Hi Saurabh,
> > > >
> > > > In general, it is always good to add new features. I am not really
> sure
> > > if
> > > > I understood your requirement. I guess it will be too long for you to
> > > > resume the job with a created savepoint in the new stand-by Flink
> > > cluster.
> > > > But if it would be acceptable to you, you should not have the issue
> you
> > > > mentioned with the checkpoint. Speaking of checkpoint, if the
> > checkpoint
> > > > interval were set properly, it should be fine even if in some rare
> > cases
> > > > the last checkpoint was partially completed and is not selected.
> > Another
> > > > option could be to trigger a manual checkpoint and then use that one
> to
> > > > resume the job to maintain the low downtime.
> > > >
> > > > Best regards,
> > > > JIng
> > > >
> > > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com>
> > wrote:
> > > >
> > > > > Hey everyone,
> > > > >
> > > > > I will create a FLIP, but wanted to gauge community opinion first.
> > The
> > > > > motivation is that production Flink applications frequently need to
> > go
> > > > > through node/image patching to update the software and AMI with
> > latest
> > > > > security fixes. These patching related restarts do not involve
> > > > application
> > > > > jar or parallelism updates and can therefore be done without costly
> > > > > savepoint completion and restore cycles by relying on the last
> > > checkpoint
> > > > > state in order to achieve minimum downtime. In order to achieve
> this,
> > > we
> > > > > currently rely on retained checkpoints and the following steps:
> > > > >
> > > > >    - Create new stand-by Flink cluster and submit application jar
> > > > >    - Delete Flink TM deployment to stop processing & checkpoints on
> > old
> > > > >    cluster(reduce duplicates)
> > > > >    - Query last completed checkpoint from REST API on JM of old
> > cluster
> > > > >    - Submit new job using last available checkpoint in new cluster,
> > > > delete
> > > > >    old cluster
> > > > >
> > > > > We have observed that this process will sometimes not select the
> > latest
> > > > > checkpoint as partially completed checkpoints race and finish after
> > > > > querying the JM. Alternatives are to rely on creating other sources
> > for
> > > > > checkpoint info but this has complications, as discussed in [2].
> > > Waiting
> > > > > and force deleting task managers increases downtimes and doesn't
> > > > guarantee
> > > > > TM process termination respectively. In order to maintain low
> > downtime,
> > > > > duplicates and solve this race we can introduce an API to suspend
> > > > > checkpointing. Querying the latest available checkpoint after
> having
> > > > > suspending checkpointing will guarantee that we can maintain
> exactly
> > > once
> > > > > in such a scenario.
> > > > >
> > > > > This also acts as an extension to [1] where the feature to trigger
> > > > > checkpoints through a control plane has been discussed and added.
> It
> > > > makes
> > > > > the checkpointing process flexible and gives the user more control
> in
> > > > > scenarios like migrating applications and letting data processing
> > catch
> > > > up
> > > > > temporarily.
> > > > > We can implement this similar to [1] and expose a trigger to
> suspend
> > > and
> > > > > resume checkpointing via CLI and REST API. We can add a parameter
> to
> > > > > suspend in 2 ways.
> > > > >
> > > > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any
> still
> > in
> > > > >    progress checkpoints/savepoints but stops only future ones
> > > > >    2. Suspend checkpoint coordinator, cancels in progress
> > > > >    checkpoints/savepoints. Guarantees no racing checkpoint
> completion
> > > and
> > > > >    could be used for canceling stuck checkpoints and help data
> > > processing
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Saurabh Kaul <sa...@gmail.com>.
Hi Piotr,

Sorry for the confusion. I'm referring to latency concerns at 2 points -

1. In producing a full snapshot, I see this is noted as follows in the Flip
[1]:
> This means that if the underlying FileSystem doesn’t support fast
duplication, incremental savepoints will most likely be still slower
compared to incremental checkpoints.
In our use case for example, we use S3 file system so we have some
additional overhead of duplication.
2. I mixed up the savepoint recovery modes, I think the default NO_CLAIM
mode would be more appropriate for production use generally since CLAIM
mode carries the risk of accidental savepoint deletion when future
checkpoints can build on top of it. Manual checkpoint deletion is not a
concern. NO_CLAIM would require the first successful checkpoint to be a
full checkpoint.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203

On Fri, Oct 21, 2022 at 1:31 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Saurabh,
>
> > 1. Stop with native savepoint does solve any races and produces a
> > predictable restoration point, but producing a self-contained snapshot
> and
> > using CLAIM mode in re-running is not necessary here and adds latency.
>
> Why do you think it adds latency? CLAIM mode is actually the fastest one
> with zero additional overheads. Native savepoints are almost an equivalent
> of checkpoints.
>
> Best,
> Piotrek
>
>
> pt., 21 paź 2022 o 00:51 Saurabh Kaul <sa...@gmail.com> napisał(a):
>
> > Hi, thanks for the quick responses,
> >
> > I think a stop-with-checkpoint idea is overlapping well with the
> > requirements.
> >
> > 1. Stop with native savepoint does solve any races and produces a
> > predictable restoration point, but producing a self-contained snapshot
> and
> > using CLAIM mode in re-running is not necessary here and adds latency.
> > Stop-with-checkpoint doesn't have these issues. It adds some downtime in
> > waiting for a checkpoint to be completed but reduces replay time in the
> new
> > cluster which is a good trade-off. Since in this scenario of job
> migration
> > the job and/or job configuration is not changing; it should ideally be as
> > fast as a regular failover scenario (like a TM going down).
> > 2. Taking complete ownership of triggering checkpoints and making them
> more
> > configurable could be feasible but are less effective comparatively in
> > terms of stopping the job for the primary purpose of low-downtime
> migration
> > of the job. Stop-with-checkpoint solves it more directly.
> >
> > Looking forward to hearing thoughts on this.
> >
> > On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pn...@apache.org>
> > wrote:
> >
> > > Hi Saurabh,
> > >
> > > Thanks for reaching out with the proposal. I have some mixed feelings
> > about
> > > this for a couple of reasons:
> > >
> > > 1. It sounds like the core problem that you are describing is the race
> > > condition between shutting down the cluster and completion of new
> > > checkpoints. My first thought would be as Jing's, why don't you use
> > > stop-with-savepoint? Especially the native savepoint? You can recover
> > from
> > > it using --claim mode, so the whole process should be quite fast
> > actually.
> > > 2. The same issue, not knowing the latest completed checkpoint id,
> > plagued
> > > us with some internal tests for quite a bit, so maybe this would also
> be
> > > worth considering to address instead? Like leaving in some text file
> the
> > > last completed checkpoint id? Or providing a way to read this from some
> > > existing metadata files? However in our tests we actually fixed/worked
> > > around that with manually triggering of checkpoints. The predecessor of
> > > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this exact
> > > issue.  Which brings me to...
> > > 3. You could actually just use the REST API to trigger all checkpoints
> > > manually. The idea behind FLINK-27101 [1] was to add full flexibility
> to
> > > the users, without adding much complexity to the system. If we start
> > adding
> > > more REST calls to control checkpointing behaviour it would complicate
> > the
> > > system.
> > > 4. If at all, I would think more towards a more generic idea of
> > dynamically
> > > reconfiguring the system. We could provide a generic way to dynamically
> > > change configuration options. We wouldn't be able to support all
> > > configurations, and furthermore, each "dynamic" option would have to be
> > > handled/passed down to and through the system differently, BUT we
> > wouldn't
> > > have to do all of that at once. We could start with a very limited set
> of
> > > dynamic options, for example just with the checkpointing interval. This
> > > must have been considered/discussed before, so I might be missing lots
> of
> > > things.
> > > 5. Another direction, if 1. is not an option for some reason, is to
> > provide
> > > a stop-with-checkpoint feature?
> > >
> > > Best Piotrek
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > [2] https://issues.apache.org/jira/browse/FLINK-24280
> > >
> > > czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):
> > >
> > > > Hi Saurabh,
> > > >
> > > > In general, it is always good to add new features. I am not really
> sure
> > > if
> > > > I understood your requirement. I guess it will be too long for you to
> > > > resume the job with a created savepoint in the new stand-by Flink
> > > cluster.
> > > > But if it would be acceptable to you, you should not have the issue
> you
> > > > mentioned with the checkpoint. Speaking of checkpoint, if the
> > checkpoint
> > > > interval were set properly, it should be fine even if in some rare
> > cases
> > > > the last checkpoint was partially completed and is not selected.
> > Another
> > > > option could be to trigger a manual checkpoint and then use that one
> to
> > > > resume the job to maintain the low downtime.
> > > >
> > > > Best regards,
> > > > JIng
> > > >
> > > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com>
> > wrote:
> > > >
> > > > > Hey everyone,
> > > > >
> > > > > I will create a FLIP, but wanted to gauge community opinion first.
> > The
> > > > > motivation is that production Flink applications frequently need to
> > go
> > > > > through node/image patching to update the software and AMI with
> > latest
> > > > > security fixes. These patching related restarts do not involve
> > > > application
> > > > > jar or parallelism updates and can therefore be done without costly
> > > > > savepoint completion and restore cycles by relying on the last
> > > checkpoint
> > > > > state in order to achieve minimum downtime. In order to achieve
> this,
> > > we
> > > > > currently rely on retained checkpoints and the following steps:
> > > > >
> > > > >    - Create new stand-by Flink cluster and submit application jar
> > > > >    - Delete Flink TM deployment to stop processing & checkpoints on
> > old
> > > > >    cluster(reduce duplicates)
> > > > >    - Query last completed checkpoint from REST API on JM of old
> > cluster
> > > > >    - Submit new job using last available checkpoint in new cluster,
> > > > delete
> > > > >    old cluster
> > > > >
> > > > > We have observed that this process will sometimes not select the
> > latest
> > > > > checkpoint as partially completed checkpoints race and finish after
> > > > > querying the JM. Alternatives are to rely on creating other sources
> > for
> > > > > checkpoint info but this has complications, as discussed in [2].
> > > Waiting
> > > > > and force deleting task managers increases downtimes and doesn't
> > > > guarantee
> > > > > TM process termination respectively. In order to maintain low
> > downtime,
> > > > > duplicates and solve this race we can introduce an API to suspend
> > > > > checkpointing. Querying the latest available checkpoint after
> having
> > > > > suspending checkpointing will guarantee that we can maintain
> exactly
> > > once
> > > > > in such a scenario.
> > > > >
> > > > > This also acts as an extension to [1] where the feature to trigger
> > > > > checkpoints through a control plane has been discussed and added.
> It
> > > > makes
> > > > > the checkpointing process flexible and gives the user more control
> in
> > > > > scenarios like migrating applications and letting data processing
> > catch
> > > > up
> > > > > temporarily.
> > > > > We can implement this similar to [1] and expose a trigger to
> suspend
> > > and
> > > > > resume checkpointing via CLI and REST API. We can add a parameter
> to
> > > > > suspend in 2 ways.
> > > > >
> > > > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any
> still
> > in
> > > > >    progress checkpoints/savepoints but stops only future ones
> > > > >    2. Suspend checkpoint coordinator, cancels in progress
> > > > >    checkpoints/savepoints. Guarantees no racing checkpoint
> completion
> > > and
> > > > >    could be used for canceling stuck checkpoints and help data
> > > processing
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Saurabh,

> 1. Stop with native savepoint does solve any races and produces a
> predictable restoration point, but producing a self-contained snapshot and
> using CLAIM mode in re-running is not necessary here and adds latency.

Why do you think it adds latency? CLAIM mode is actually the fastest one
with zero additional overheads. Native savepoints are almost an equivalent
of checkpoints.

Best,
Piotrek


pt., 21 paź 2022 o 00:51 Saurabh Kaul <sa...@gmail.com> napisał(a):

> Hi, thanks for the quick responses,
>
> I think a stop-with-checkpoint idea is overlapping well with the
> requirements.
>
> 1. Stop with native savepoint does solve any races and produces a
> predictable restoration point, but producing a self-contained snapshot and
> using CLAIM mode in re-running is not necessary here and adds latency.
> Stop-with-checkpoint doesn't have these issues. It adds some downtime in
> waiting for a checkpoint to be completed but reduces replay time in the new
> cluster which is a good trade-off. Since in this scenario of job migration
> the job and/or job configuration is not changing; it should ideally be as
> fast as a regular failover scenario (like a TM going down).
> 2. Taking complete ownership of triggering checkpoints and making them more
> configurable could be feasible but are less effective comparatively in
> terms of stopping the job for the primary purpose of low-downtime migration
> of the job. Stop-with-checkpoint solves it more directly.
>
> Looking forward to hearing thoughts on this.
>
> On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
> > Hi Saurabh,
> >
> > Thanks for reaching out with the proposal. I have some mixed feelings
> about
> > this for a couple of reasons:
> >
> > 1. It sounds like the core problem that you are describing is the race
> > condition between shutting down the cluster and completion of new
> > checkpoints. My first thought would be as Jing's, why don't you use
> > stop-with-savepoint? Especially the native savepoint? You can recover
> from
> > it using --claim mode, so the whole process should be quite fast
> actually.
> > 2. The same issue, not knowing the latest completed checkpoint id,
> plagued
> > us with some internal tests for quite a bit, so maybe this would also be
> > worth considering to address instead? Like leaving in some text file the
> > last completed checkpoint id? Or providing a way to read this from some
> > existing metadata files? However in our tests we actually fixed/worked
> > around that with manually triggering of checkpoints. The predecessor of
> > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this exact
> > issue.  Which brings me to...
> > 3. You could actually just use the REST API to trigger all checkpoints
> > manually. The idea behind FLINK-27101 [1] was to add full flexibility to
> > the users, without adding much complexity to the system. If we start
> adding
> > more REST calls to control checkpointing behaviour it would complicate
> the
> > system.
> > 4. If at all, I would think more towards a more generic idea of
> dynamically
> > reconfiguring the system. We could provide a generic way to dynamically
> > change configuration options. We wouldn't be able to support all
> > configurations, and furthermore, each "dynamic" option would have to be
> > handled/passed down to and through the system differently, BUT we
> wouldn't
> > have to do all of that at once. We could start with a very limited set of
> > dynamic options, for example just with the checkpointing interval. This
> > must have been considered/discussed before, so I might be missing lots of
> > things.
> > 5. Another direction, if 1. is not an option for some reason, is to
> provide
> > a stop-with-checkpoint feature?
> >
> > Best Piotrek
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > [2] https://issues.apache.org/jira/browse/FLINK-24280
> >
> > czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):
> >
> > > Hi Saurabh,
> > >
> > > In general, it is always good to add new features. I am not really sure
> > if
> > > I understood your requirement. I guess it will be too long for you to
> > > resume the job with a created savepoint in the new stand-by Flink
> > cluster.
> > > But if it would be acceptable to you, you should not have the issue you
> > > mentioned with the checkpoint. Speaking of checkpoint, if the
> checkpoint
> > > interval were set properly, it should be fine even if in some rare
> cases
> > > the last checkpoint was partially completed and is not selected.
> Another
> > > option could be to trigger a manual checkpoint and then use that one to
> > > resume the job to maintain the low downtime.
> > >
> > > Best regards,
> > > JIng
> > >
> > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com>
> wrote:
> > >
> > > > Hey everyone,
> > > >
> > > > I will create a FLIP, but wanted to gauge community opinion first.
> The
> > > > motivation is that production Flink applications frequently need to
> go
> > > > through node/image patching to update the software and AMI with
> latest
> > > > security fixes. These patching related restarts do not involve
> > > application
> > > > jar or parallelism updates and can therefore be done without costly
> > > > savepoint completion and restore cycles by relying on the last
> > checkpoint
> > > > state in order to achieve minimum downtime. In order to achieve this,
> > we
> > > > currently rely on retained checkpoints and the following steps:
> > > >
> > > >    - Create new stand-by Flink cluster and submit application jar
> > > >    - Delete Flink TM deployment to stop processing & checkpoints on
> old
> > > >    cluster(reduce duplicates)
> > > >    - Query last completed checkpoint from REST API on JM of old
> cluster
> > > >    - Submit new job using last available checkpoint in new cluster,
> > > delete
> > > >    old cluster
> > > >
> > > > We have observed that this process will sometimes not select the
> latest
> > > > checkpoint as partially completed checkpoints race and finish after
> > > > querying the JM. Alternatives are to rely on creating other sources
> for
> > > > checkpoint info but this has complications, as discussed in [2].
> > Waiting
> > > > and force deleting task managers increases downtimes and doesn't
> > > guarantee
> > > > TM process termination respectively. In order to maintain low
> downtime,
> > > > duplicates and solve this race we can introduce an API to suspend
> > > > checkpointing. Querying the latest available checkpoint after having
> > > > suspending checkpointing will guarantee that we can maintain exactly
> > once
> > > > in such a scenario.
> > > >
> > > > This also acts as an extension to [1] where the feature to trigger
> > > > checkpoints through a control plane has been discussed and added. It
> > > makes
> > > > the checkpointing process flexible and gives the user more control in
> > > > scenarios like migrating applications and letting data processing
> catch
> > > up
> > > > temporarily.
> > > > We can implement this similar to [1] and expose a trigger to suspend
> > and
> > > > resume checkpointing via CLI and REST API. We can add a parameter to
> > > > suspend in 2 ways.
> > > >
> > > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any still
> in
> > > >    progress checkpoints/savepoints but stops only future ones
> > > >    2. Suspend checkpoint coordinator, cancels in progress
> > > >    checkpoints/savepoints. Guarantees no racing checkpoint completion
> > and
> > > >    could be used for canceling stuck checkpoints and help data
> > processing
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > > >
> > >
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Saurabh Kaul <sa...@gmail.com>.
Hi, thanks for the quick responses,

I think a stop-with-checkpoint idea is overlapping well with the
requirements.

1. Stop with native savepoint does solve any races and produces a
predictable restoration point, but producing a self-contained snapshot and
using CLAIM mode in re-running is not necessary here and adds latency.
Stop-with-checkpoint doesn't have these issues. It adds some downtime in
waiting for a checkpoint to be completed but reduces replay time in the new
cluster which is a good trade-off. Since in this scenario of job migration
the job and/or job configuration is not changing; it should ideally be as
fast as a regular failover scenario (like a TM going down).
2. Taking complete ownership of triggering checkpoints and making them more
configurable could be feasible but are less effective comparatively in
terms of stopping the job for the primary purpose of low-downtime migration
of the job. Stop-with-checkpoint solves it more directly.

Looking forward to hearing thoughts on this.

On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Saurabh,
>
> Thanks for reaching out with the proposal. I have some mixed feelings about
> this for a couple of reasons:
>
> 1. It sounds like the core problem that you are describing is the race
> condition between shutting down the cluster and completion of new
> checkpoints. My first thought would be as Jing's, why don't you use
> stop-with-savepoint? Especially the native savepoint? You can recover from
> it using --claim mode, so the whole process should be quite fast actually.
> 2. The same issue, not knowing the latest completed checkpoint id, plagued
> us with some internal tests for quite a bit, so maybe this would also be
> worth considering to address instead? Like leaving in some text file the
> last completed checkpoint id? Or providing a way to read this from some
> existing metadata files? However in our tests we actually fixed/worked
> around that with manually triggering of checkpoints. The predecessor of
> FLINK-27101 [1], FLINK-24280 [2], was implemented to address this exact
> issue.  Which brings me to...
> 3. You could actually just use the REST API to trigger all checkpoints
> manually. The idea behind FLINK-27101 [1] was to add full flexibility to
> the users, without adding much complexity to the system. If we start adding
> more REST calls to control checkpointing behaviour it would complicate the
> system.
> 4. If at all, I would think more towards a more generic idea of dynamically
> reconfiguring the system. We could provide a generic way to dynamically
> change configuration options. We wouldn't be able to support all
> configurations, and furthermore, each "dynamic" option would have to be
> handled/passed down to and through the system differently, BUT we wouldn't
> have to do all of that at once. We could start with a very limited set of
> dynamic options, for example just with the checkpointing interval. This
> must have been considered/discussed before, so I might be missing lots of
> things.
> 5. Another direction, if 1. is not an option for some reason, is to provide
> a stop-with-checkpoint feature?
>
> Best Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-27101
> [2] https://issues.apache.org/jira/browse/FLINK-24280
>
> czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):
>
> > Hi Saurabh,
> >
> > In general, it is always good to add new features. I am not really sure
> if
> > I understood your requirement. I guess it will be too long for you to
> > resume the job with a created savepoint in the new stand-by Flink
> cluster.
> > But if it would be acceptable to you, you should not have the issue you
> > mentioned with the checkpoint. Speaking of checkpoint, if the checkpoint
> > interval were set properly, it should be fine even if in some rare cases
> > the last checkpoint was partially completed and is not selected. Another
> > option could be to trigger a manual checkpoint and then use that one to
> > resume the job to maintain the low downtime.
> >
> > Best regards,
> > JIng
> >
> > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com> wrote:
> >
> > > Hey everyone,
> > >
> > > I will create a FLIP, but wanted to gauge community opinion first. The
> > > motivation is that production Flink applications frequently need to go
> > > through node/image patching to update the software and AMI with latest
> > > security fixes. These patching related restarts do not involve
> > application
> > > jar or parallelism updates and can therefore be done without costly
> > > savepoint completion and restore cycles by relying on the last
> checkpoint
> > > state in order to achieve minimum downtime. In order to achieve this,
> we
> > > currently rely on retained checkpoints and the following steps:
> > >
> > >    - Create new stand-by Flink cluster and submit application jar
> > >    - Delete Flink TM deployment to stop processing & checkpoints on old
> > >    cluster(reduce duplicates)
> > >    - Query last completed checkpoint from REST API on JM of old cluster
> > >    - Submit new job using last available checkpoint in new cluster,
> > delete
> > >    old cluster
> > >
> > > We have observed that this process will sometimes not select the latest
> > > checkpoint as partially completed checkpoints race and finish after
> > > querying the JM. Alternatives are to rely on creating other sources for
> > > checkpoint info but this has complications, as discussed in [2].
> Waiting
> > > and force deleting task managers increases downtimes and doesn't
> > guarantee
> > > TM process termination respectively. In order to maintain low downtime,
> > > duplicates and solve this race we can introduce an API to suspend
> > > checkpointing. Querying the latest available checkpoint after having
> > > suspending checkpointing will guarantee that we can maintain exactly
> once
> > > in such a scenario.
> > >
> > > This also acts as an extension to [1] where the feature to trigger
> > > checkpoints through a control plane has been discussed and added. It
> > makes
> > > the checkpointing process flexible and gives the user more control in
> > > scenarios like migrating applications and letting data processing catch
> > up
> > > temporarily.
> > > We can implement this similar to [1] and expose a trigger to suspend
> and
> > > resume checkpointing via CLI and REST API. We can add a parameter to
> > > suspend in 2 ways.
> > >
> > >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any still in
> > >    progress checkpoints/savepoints but stops only future ones
> > >    2. Suspend checkpoint coordinator, cancels in progress
> > >    checkpoints/savepoints. Guarantees no racing checkpoint completion
> and
> > >    could be used for canceling stuck checkpoints and help data
> processing
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > > [2] https://issues.apache.org/jira/browse/FLINK-26916
> > >
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Saurabh,

Thanks for reaching out with the proposal. I have some mixed feelings about
this for a couple of reasons:

1. It sounds like the core problem that you are describing is the race
condition between shutting down the cluster and completion of new
checkpoints. My first thought would be as Jing's, why don't you use
stop-with-savepoint? Especially the native savepoint? You can recover from
it using --claim mode, so the whole process should be quite fast actually.
2. The same issue, not knowing the latest completed checkpoint id, plagued
us with some internal tests for quite a bit, so maybe this would also be
worth considering to address instead? Like leaving in some text file the
last completed checkpoint id? Or providing a way to read this from some
existing metadata files? However in our tests we actually fixed/worked
around that with manually triggering of checkpoints. The predecessor of
FLINK-27101 [1], FLINK-24280 [2], was implemented to address this exact
issue.  Which brings me to...
3. You could actually just use the REST API to trigger all checkpoints
manually. The idea behind FLINK-27101 [1] was to add full flexibility to
the users, without adding much complexity to the system. If we start adding
more REST calls to control checkpointing behaviour it would complicate the
system.
4. If at all, I would think more towards a more generic idea of dynamically
reconfiguring the system. We could provide a generic way to dynamically
change configuration options. We wouldn't be able to support all
configurations, and furthermore, each "dynamic" option would have to be
handled/passed down to and through the system differently, BUT we wouldn't
have to do all of that at once. We could start with a very limited set of
dynamic options, for example just with the checkpointing interval. This
must have been considered/discussed before, so I might be missing lots of
things.
5. Another direction, if 1. is not an option for some reason, is to provide
a stop-with-checkpoint feature?

Best Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-27101
[2] https://issues.apache.org/jira/browse/FLINK-24280

czw., 20 paź 2022 o 11:53 Jing Ge <ji...@ververica.com> napisał(a):

> Hi Saurabh,
>
> In general, it is always good to add new features. I am not really sure if
> I understood your requirement. I guess it will be too long for you to
> resume the job with a created savepoint in the new stand-by Flink cluster.
> But if it would be acceptable to you, you should not have the issue you
> mentioned with the checkpoint. Speaking of checkpoint, if the checkpoint
> interval were set properly, it should be fine even if in some rare cases
> the last checkpoint was partially completed and is not selected. Another
> option could be to trigger a manual checkpoint and then use that one to
> resume the job to maintain the low downtime.
>
> Best regards,
> JIng
>
> On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com> wrote:
>
> > Hey everyone,
> >
> > I will create a FLIP, but wanted to gauge community opinion first. The
> > motivation is that production Flink applications frequently need to go
> > through node/image patching to update the software and AMI with latest
> > security fixes. These patching related restarts do not involve
> application
> > jar or parallelism updates and can therefore be done without costly
> > savepoint completion and restore cycles by relying on the last checkpoint
> > state in order to achieve minimum downtime. In order to achieve this, we
> > currently rely on retained checkpoints and the following steps:
> >
> >    - Create new stand-by Flink cluster and submit application jar
> >    - Delete Flink TM deployment to stop processing & checkpoints on old
> >    cluster(reduce duplicates)
> >    - Query last completed checkpoint from REST API on JM of old cluster
> >    - Submit new job using last available checkpoint in new cluster,
> delete
> >    old cluster
> >
> > We have observed that this process will sometimes not select the latest
> > checkpoint as partially completed checkpoints race and finish after
> > querying the JM. Alternatives are to rely on creating other sources for
> > checkpoint info but this has complications, as discussed in [2]. Waiting
> > and force deleting task managers increases downtimes and doesn't
> guarantee
> > TM process termination respectively. In order to maintain low downtime,
> > duplicates and solve this race we can introduce an API to suspend
> > checkpointing. Querying the latest available checkpoint after having
> > suspending checkpointing will guarantee that we can maintain exactly once
> > in such a scenario.
> >
> > This also acts as an extension to [1] where the feature to trigger
> > checkpoints through a control plane has been discussed and added. It
> makes
> > the checkpointing process flexible and gives the user more control in
> > scenarios like migrating applications and letting data processing catch
> up
> > temporarily.
> > We can implement this similar to [1] and expose a trigger to suspend and
> > resume checkpointing via CLI and REST API. We can add a parameter to
> > suspend in 2 ways.
> >
> >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any still in
> >    progress checkpoints/savepoints but stops only future ones
> >    2. Suspend checkpoint coordinator, cancels in progress
> >    checkpoints/savepoints. Guarantees no racing checkpoint completion and
> >    could be used for canceling stuck checkpoints and help data processing
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > [2] https://issues.apache.org/jira/browse/FLINK-26916
> >
>

Re: [DISCUSS] REST API to suspend & resume checkpointing

Posted by Jing Ge <ji...@ververica.com>.
Hi Saurabh,

In general, it is always good to add new features. I am not really sure if
I understood your requirement. I guess it will be too long for you to
resume the job with a created savepoint in the new stand-by Flink cluster.
But if it would be acceptable to you, you should not have the issue you
mentioned with the checkpoint. Speaking of checkpoint, if the checkpoint
interval were set properly, it should be fine even if in some rare cases
the last checkpoint was partially completed and is not selected. Another
option could be to trigger a manual checkpoint and then use that one to
resume the job to maintain the low downtime.

Best regards,
JIng

On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <sa...@gmail.com> wrote:

> Hey everyone,
>
> I will create a FLIP, but wanted to gauge community opinion first. The
> motivation is that production Flink applications frequently need to go
> through node/image patching to update the software and AMI with latest
> security fixes. These patching related restarts do not involve application
> jar or parallelism updates and can therefore be done without costly
> savepoint completion and restore cycles by relying on the last checkpoint
> state in order to achieve minimum downtime. In order to achieve this, we
> currently rely on retained checkpoints and the following steps:
>
>    - Create new stand-by Flink cluster and submit application jar
>    - Delete Flink TM deployment to stop processing & checkpoints on old
>    cluster(reduce duplicates)
>    - Query last completed checkpoint from REST API on JM of old cluster
>    - Submit new job using last available checkpoint in new cluster, delete
>    old cluster
>
> We have observed that this process will sometimes not select the latest
> checkpoint as partially completed checkpoints race and finish after
> querying the JM. Alternatives are to rely on creating other sources for
> checkpoint info but this has complications, as discussed in [2]. Waiting
> and force deleting task managers increases downtimes and doesn't guarantee
> TM process termination respectively. In order to maintain low downtime,
> duplicates and solve this race we can introduce an API to suspend
> checkpointing. Querying the latest available checkpoint after having
> suspending checkpointing will guarantee that we can maintain exactly once
> in such a scenario.
>
> This also acts as an extension to [1] where the feature to trigger
> checkpoints through a control plane has been discussed and added. It makes
> the checkpointing process flexible and gives the user more control in
> scenarios like migrating applications and letting data processing catch up
> temporarily.
> We can implement this similar to [1] and expose a trigger to suspend and
> resume checkpointing via CLI and REST API. We can add a parameter to
> suspend in 2 ways.
>
>    1. Suspend scheduled checkpoint trigger, doesn’t cancel any still in
>    progress checkpoints/savepoints but stops only future ones
>    2. Suspend checkpoint coordinator, cancels in progress
>    checkpoints/savepoints. Guarantees no racing checkpoint completion and
>    could be used for canceling stuck checkpoints and help data processing
>
> [1] https://issues.apache.org/jira/browse/FLINK-27101
> [2] https://issues.apache.org/jira/browse/FLINK-26916
>