You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gyula Fóra <gy...@apache.org> on 2022/02/06 21:22:40 UTC

[DISCUSS] Checkpointing (partially) failing jobs

Hi all!

At the moment checkpointing only works for healthy jobs with all running
(or some finished) tasks. This sounds reasonable in most cases but there
are a few applications where it would make sense to checkpoint failing jobs
as well.

Due to how the checkpointing mechanism works, subgraphs that have a failing
task cannot be checkpointed without violating the exactly-once semantics.
However if the job has multiple independent subgraphs (that are not
connected to each other), even if one subgraph is failing, the other
completely running one could be checkpointed. In these cases the tasks of
the failing subgraph could simply inherit the last successful checkpoint
metadata (before they started failing). This logic would produce a
consistent checkpoint.

The job as a whole could now make stateful progress even if some subgraphs
are constantly failing. This can be very valuable if for some reason the
job has a larger number of independent subgraphs that are expected to fail
every once in a while, or if some subgraphs can have longer downtimes that
would now cause the whole job to stall.

What do you think?

Cheers,
Gyula

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Gen Luo <lu...@gmail.com>.
Hi Chesney and Piotr,

I have seen some jobs with tens of independent vertices that process data
for the same business. The sub jobs should be started or stopped together.
Splitting them into separate jobs means the user has to manage them
separately. But in fact the jobs were running in per-job mode, and maybe
there's now a better choice. Let's see if others have some more valuable
cases.

By the way, I'd like to point out that if we can checkpoint pipeline
regions individually, even a job with only one job graph, if it has no
all-to-all edges connecting all vertices into one pipeline region, may
benefit from this effort, since any failure, long-time pause or
backpressure in a pipeline region will not block the checkpointing of other
regions.

And @Piotr, this is why I think that this discussion may relate to the
task-local checkpoints. Both of them require to checkpoint parts of a job
individually, and can restore only a part of the job, without breaking the
consistency. The main difference is that to maintain the consistency,
task-local checkpoints have to handle the channel data. This is omitted in
the approximate task-local recovery since the consistency is not
guaranteed, and this is why the approximate task-local recovery may use a
part of the global snapshot, rather than individually checkpointing each
subtask. However, in the pipeline region checkpoints, consistency is
guaranteed naturally. We can focus on how to checkpoint individually, the
effort of which is probably necessary if we want to implement the
task-local checkpointing with consistency guarantee.

On Tue, Feb 8, 2022 at 7:41 PM 丛鹏 <co...@gmail.com> wrote:

> hi guys,If I understand it correctly, will only some checkpoints be
> recovered when there is an error in the Flink batch?
>
> Piotr Nowojski <pn...@apache.org> 于2022年2月8日周二 19:05写道:
>
>> Hi,
>>
>> I second Chesnay's comment and would like to better understand the
>> motivation behind this. At the surface it sounds to me like this might
>> require quite a bit of work for a very narrow use case.
>>
>> At the same time I have a feeling that Yuan, you are mixing this feature
>> request (checkpointing subgraphs/pipeline regions independently) and a
>> very
>> very different issue of "task local checkpoints"? Those problems are kind
>> of similar, but not quite.
>>
>> Best,
>> Piotrek
>>
>> wt., 8 lut 2022 o 11:44 Chesnay Schepler <ch...@apache.org> napisał(a):
>>
>> > Could someone expand on these operational issues you're facing when
>> > achieving this via separate jobs?
>> >
>> > I feel like we're skipping a step, arguing about solutions without even
>> > having discussed the underlying problem.
>> >
>> > On 08/02/2022 11:25, Gen Luo wrote:
>> > > Hi,
>> > >
>> > > @Yuan
>> > > Do you mean that there should be no shared state between source
>> subtasks?
>> > > Sharing state between checkpoints of a specific subtask should be
>> fine.
>> > >
>> > > Sharing state between subtasks of a task can be an issue, no matter
>> > whether
>> > > it's a source. That's also what I was afraid of in the previous
>> replies.
>> > In
>> > > one word, if the behavior of a pipeline region can somehow influence
>> the
>> > > state of other pipeline regions, their checkpoints have to be aligned
>> > > before rescaling.
>> > >
>> > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yu...@gmail.com>
>> wrote:
>> > >
>> > >> Hey Folks,
>> > >>
>> > >> Thanks for the discussion!
>> > >>
>> > >> *Motiviation and use cases*
>> > >> I think motiviation and use cases are very clear and I do not have
>> > doubts
>> > >> on this part.
>> > >> A typical use case is ETL with two-phase-commit, hundreds of
>> partitions
>> > can
>> > >> be blocked by a single straggler (a single task's checkpoint abortion
>> > can
>> > >> affect all, not necessary failure).
>> > >>
>> > >> *Source offset redistribution*
>> > >> As for the known sources & implementation for Flink, I can not find a
>> > case
>> > >> that does not work, *for now*.
>> > >> I need to dig a bit more: how splits are tracked assigned, not
>> > successfully
>> > >> processed, succesffully processed e.t.c.
>> > >> I guess it is a single shared source OPCoordinator. And how this
>> > *shared*
>> > >> state (between tasks) is preserved?
>> > >>
>> > >> *Input partition/splits treated completely independent from each
>> other*
>> > >> This part I am still not sure, as mentioned if we have shared state
>> of
>> > >> source in the above section.
>> > >>
>> > >> To Thomas:
>> > >>> In Yuan's example, is there a reason why CP8 could not be promoted
>> to
>> > >>> CP10 by the coordinator for PR2 once it receives the notification
>> that
>> > >>> CP10 did not complete? It appears that should be possible since in
>> its
>> > >>> effect it should be no different than no data processed between CP8
>> > >>>   and CP10?
>> > >> Not sure what "promoted" means here, but
>> > >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
>> > >> if no shared state in source, as exactly what you meantinoed,
>> > >> "it should be no different than no data processed between CP8 and
>> CP10"
>> > >>
>> > >> 2. I've noticed that from this question there is a gap between
>> > >> "*allow aborted/failed checkpoint in independent sub-graph*" and
>> > >> my intention: "*independent sub-graph checkpointing indepently*"
>> > >>
>> > >> Best
>> > >> Yuan
>> > >>
>> > >>
>> > >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <lu...@gmail.com> wrote:
>> > >>
>> > >>> Hi,
>> > >>>
>> > >>> I'm thinking about Yuan's case. Let's assume that the case is
>> running
>> > in
>> > >>> current Flink:
>> > >>> 1. CP8 finishes
>> > >>> 2. For some reason, PR2 stops consuming records from the source
>> (but is
>> > >> not
>> > >>> stuck), and PR1 continues consuming new records.
>> > >>> 3. CP9 and CP10 finish
>> > >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches
>> the
>> > >> same
>> > >>> final status with that in Yuan's case before CP11 starts.
>> > >>>
>> > >>> I support that in this case, the status of the job can be the same
>> as
>> > in
>> > >>> Yuan's case, and the snapshot (including source states) taken at
>> CP10
>> > >>> should be the same as the composed global snapshot in Yuan's case,
>> > which
>> > >> is
>> > >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
>> > >> failed
>> > >>> checkpointing nor uncommitted consuming have side effects, both of
>> > which
>> > >>> can break the exactly-once semantics when replaying. So I think
>> there
>> > >>> should be no difference between rescaling the combined global
>> snapshot
>> > or
>> > >>> the globally taken one, i.e. if the input partitions are not
>> > independent,
>> > >>> we are probably not able to rescale the source state in the current
>> > Flink
>> > >>> eiter.
>> > >>>
>> > >>> And @Thomas, I do agree that the operational burden is
>> > >>> significantly reduced, while I'm a little afraid that checkpointing
>> the
>> > >>> subgraphs individually may increase most of the runtime overhead
>> back
>> > >>> again. Maybe we can find a better way to implement this.
>> > >>>
>> > >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:
>> > >>>
>> > >>>> Hi,
>> > >>>>
>> > >>>> Thanks for opening this discussion! The proposed enhancement would
>> be
>> > >>>> interesting for use cases in our infrastructure as well.
>> > >>>>
>> > >>>> There are scenarios where it makes sense to have multiple
>> disconnected
>> > >>>> subgraphs in a single job because it can significantly reduce the
>> > >>>> operational burden as well as the runtime overhead. Since we allow
>> > >>>> subgraphs to recover independently, then why not allow them to make
>> > >>>> progress independently also, which would imply that checkpointing
>> must
>> > >>>> succeed for non affected subgraphs as certain behavior is tied to
>> > >>>> checkpoint completion, like Kafka offset commit, file output etc.
>> > >>>>
>> > >>>> As for source offset redistribution, offset/position needs to be
>> tied
>> > >>>> to splits (in FLIP-27) and legacy sources. (It applies to both
>> Kafka
>> > >>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
>> > >>>> source framework, it would be hard to implement a source with
>> correct
>> > >>>> behavior that does not track the position along with the split.
>> > >>>>
>> > >>>> In Yuan's example, is there a reason why CP8 could not be promoted
>> to
>> > >>>> CP10 by the coordinator for PR2 once it receives the notification
>> that
>> > >>>> CP10 did not complete? It appears that should be possible since in
>> its
>> > >>>> effect it should be no different than no data processed between CP8
>> > >>>> and CP10?
>> > >>>>
>> > >>>> Thanks,
>> > >>>> Thomas
>> > >>>>
>> > >>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrmann@apache.org
>> >
>> > >>> wrote:
>> > >>>>> Thanks for the clarification Yuan and Gen,
>> > >>>>>
>> > >>>>> I agree that the checkpointing of the sources needs to support the
>> > >>>>> rescaling case, otherwise it does not work. Is there currently a
>> > >> source
>> > >>>>> implementation where this wouldn't work? For Kafka it should work
>> > >>> because
>> > >>>>> we store the offset per assigned partition. For Kinesis it is
>> > >> probably
>> > >>>> the
>> > >>>>> same. For the Filesource we store the set of unread input splits
>> in
>> > >> the
>> > >>>>> source coordinator and the state of the assigned splits in the
>> > >> sources.
>> > >>>>> This should probably also work since new splits are only handed
>> out
>> > >> to
>> > >>>>> running tasks.
>> > >>>>>
>> > >>>>> Cheers,
>> > >>>>> Till
>> > >>>>>
>> > >>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com>
>> > >>> wrote:
>> > >>>>>> Hey Till,
>> > >>>>>>
>> > >>>>>>> Why rescaling is a problem for pipelined regions/independent
>> > >>>> execution
>> > >>>>>> subgraphs:
>> > >>>>>>
>> > >>>>>> Take a simplified example :
>> > >>>>>> job graph : source  (2 instances) -> sink (2 instances)
>> > >>>>>> execution graph:
>> > >>>>>> source (1/2)  -> sink (1/2)   [pieplined region 1]
>> > >>>>>> source (2/2)  -> sink (2/2)   [pieplined region 2]
>> > >>>>>>
>> > >>>>>> Let's assume checkpoints are still triggered globally, meaning
>> > >>>> different
>> > >>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches
>> > >>> with
>> > >>>> PR2
>> > >>>>>> CP1).
>> > >>>>>>
>> > >>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8.
>> > >>>>>>
>> > >>>>>> Let's say we want to rescale to parallelism 3 due to increased
>> > >> input.
>> > >>>>>> - Notice that we can not simply rescale based on the latest
>> > >> completed
>> > >>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
>> > >>> output
>> > >>>>>> externally.
>> > >>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends
>> on
>> > >>>> how the
>> > >>>>>> source's offset redistribution is implemented.
>> > >>>>>>     The answer is yes if we treat each input partition as
>> > >> independent
>> > >>>> from
>> > >>>>>> each other, *but I am not sure whether we can make that
>> > >> assumption*.
>> > >>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned
>> > >>> with
>> > >>>> CPs.
>> > >>>>>> Best
>> > >>>>>> -Yuan
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <
>> trohrmann@apache.org
>> > >>>> wrote:
>> > >>>>>>> Hi everyone,
>> > >>>>>>>
>> > >>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if
>> we
>> > >>> say
>> > >>>>>> that
>> > >>>>>>> separate pipelined regions can take checkpoints independently?
>> > >>>>>>> Conceptually, I somehow think that a pipelined region that is
>> > >>> failed
>> > >>>> and
>> > >>>>>>> cannot create a new checkpoint is more or less the same as a
>> > >>>> pipelined
>> > >>>>>>> region that didn't get new input or a very very slow pipelined
>> > >>> region
>> > >>>>>> which
>> > >>>>>>> couldn't read new records since the last checkpoint (assuming
>> > >> that
>> > >>>> the
>> > >>>>>>> checkpoint coordinator can create a global checkpoint by
>> > >> combining
>> > >>>>>>> individual checkpoints (e.g. taking the last completed
>> checkpoint
>> > >>>> from
>> > >>>>>> each
>> > >>>>>>> pipelined region)). If this comparison is correct, then this
>> > >> would
>> > >>>> mean
>> > >>>>>>> that we have rescaling problems under the latter two cases.
>> > >>>>>>>
>> > >>>>>>> Cheers,
>> > >>>>>>> Till
>> > >>>>>>>
>> > >>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com>
>> > >>> wrote:
>> > >>>>>>>> Hi Gyula,
>> > >>>>>>>>
>> > >>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can
>> > >>>> discuss
>> > >>>>>>> this
>> > >>>>>>>> within two scopes. One is the job subgraph, the other is the
>> > >>>> execution
>> > >>>>>>>> subgraph, which I suppose is the same as PipelineRegion.
>> > >>>>>>>>
>> > >>>>>>>> An idea is to individually checkpoint the PipelineRegions, for
>> > >>> the
>> > >>>>>>>> recovering in a single run.
>> > >>>>>>>>
>> > >>>>>>>> Flink has now supported PipelineRegion based failover, with a
>> > >>>> subset
>> > >>>>>> of a
>> > >>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread
>> > >>>> within a
>> > >>>>>>>> PipelineRegion, so the checkpointing of individual
>> > >>> PipelineRegions
>> > >>>> is
>> > >>>>>>>> actually independent. Since in a single run of a job, the
>> > >>>>>> PipelineRegions
>> > >>>>>>>> are fixed, we can individually checkpoint separated
>> > >>>> PipelineRegions,
>> > >>>>>>>> despite what status the other PipelineRegions are, and use a
>> > >>>> snapshot
>> > >>>>>> of
>> > >>>>>>> a
>> > >>>>>>>> failing region to recover, instead of the subset of a global
>> > >>>> snapshot.
>> > >>>>>>> This
>> > >>>>>>>> can support separated job subgraphs as well, since they will
>> > >> also
>> > >>>> be
>> > >>>>>>>> separated into different PipelineRegions. I think this can
>> > >>> fulfill
>> > >>>> your
>> > >>>>>>>> needs.
>> > >>>>>>>>
>> > >>>>>>>> In fact the individual snapshots of all PipelineRegions can
>> > >> form
>> > >>> a
>> > >>>>>> global
>> > >>>>>>>> snapshot, and the alignment of snapshots of individual regions
>> > >> is
>> > >>>> not
>> > >>>>>>>> necessary. But rescaling this global snapshot can be
>> > >> potentially
>> > >>>>>>> complex. I
>> > >>>>>>>> think it's better to use the individual snapshots in a single
>> > >>> run,
>> > >>>> and
>> > >>>>>>> take
>> > >>>>>>>> a global checkpoint/savepoint before restarting the job,
>> > >>> rescaling
>> > >>>> it
>> > >>>>>> or
>> > >>>>>>>> not.
>> > >>>>>>>>
>> > >>>>>>>> A major issue of this plan is that it breaks the checkpoint
>> > >>>> mechanism
>> > >>>>>> of
>> > >>>>>>>> Flink. As far as I know, even in the approximate recovery, the
>> > >>>> snapshot
>> > >>>>>>>> used to recover a single task is still a part of a global
>> > >>>> snapshot. To
>> > >>>>>>>> implement the individual checkpointing of PipelineRegions,
>> > >> there
>> > >>>> may
>> > >>>>>> need
>> > >>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a
>> > >> new
>> > >>>>>> global
>> > >>>>>>>> checkpoint coordinator. When the scale goes up, there can be
>> > >> many
>> > >>>>>>>> individual regions, which can be a big burden to the job
>> > >> manager.
>> > >>>> The
>> > >>>>>>>> meaning of the checkpoint id will also be changed, which can
>> > >>> affect
>> > >>>>>> many
>> > >>>>>>>> aspects. There can be lots of work and risks, and the risks
>> > >> still
>> > >>>> exist
>> > >>>>>>> if
>> > >>>>>>>> we only individually checkpoint separated job subgraphs, since
>> > >>> the
>> > >>>>>>>> mechanism is still broken. If that is what you need, maybe
>> > >>>> separating
>> > >>>>>>> them
>> > >>>>>>>> into different jobs is an easier and better choice, as Caizhi
>> > >> and
>> > >>>> Yuan
>> > >>>>>>>> mentioned.
>> > >>>>>>>>
>> > >>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
>> > >> yuanmei.work@gmail.com
>> > >>>>>> wrote:
>> > >>>>>>>>> Hey Gyula,
>> > >>>>>>>>>
>> > >>>>>>>>> That's a very interesting idea. The discussion about the
>> > >>>> `Individual`
>> > >>>>>>> vs
>> > >>>>>>>>> `Global` checkpoint was raised before, but the main concern
>> > >> was
>> > >>>> from
>> > >>>>>>> two
>> > >>>>>>>>> aspects:
>> > >>>>>>>>>
>> > >>>>>>>>> - Non-deterministic replaying may lead to an inconsistent
>> > >> view
>> > >>> of
>> > >>>>>>>>> checkpoint
>> > >>>>>>>>> - It is not easy to form a clear cut of past and future and
>> > >>>> hence no
>> > >>>>>>>> clear
>> > >>>>>>>>> cut of where the start point of the source should begin to
>> > >>> replay
>> > >>>>>> from.
>> > >>>>>>>>> Starting from independent subgraphs as you proposed may be a
>> > >>> good
>> > >>>>>>>> starting
>> > >>>>>>>>> point. However, when we talk about subgraph, do we mention it
>> > >>> as
>> > >>>> a
>> > >>>>>> job
>> > >>>>>>>>> subgraph (each vertex is one or more operators) or execution
>> > >>>> subgraph
>> > >>>>>>>> (each
>> > >>>>>>>>> vertex is a task instance)?
>> > >>>>>>>>>
>> > >>>>>>>>> If it is a job subgraph, then indeed, why not separate it
>> > >> into
>> > >>>>>> multiple
>> > >>>>>>>>> jobs as Caizhi mentioned.
>> > >>>>>>>>> If it is an execution subgraph, then it is difficult to
>> > >> handle
>> > >>>>>>> rescaling
>> > >>>>>>>>> due to inconsistent views of checkpoints between tasks of the
>> > >>>> same
>> > >>>>>>>>> operator.
>> > >>>>>>>>>
>> > >>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an
>> > >>> interesting
>> > >>>>>>>> direction
>> > >>>>>>>>> to think of, and I'd love to hear more from you!
>> > >>>>>>>>>
>> > >>>>>>>>> Best,
>> > >>>>>>>>>
>> > >>>>>>>>> Yuan
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
>> > >>>> tsreaper96@gmail.com>
>> > >>>>>>>> wrote:
>> > >>>>>>>>>> Hi Gyula!
>> > >>>>>>>>>>
>> > >>>>>>>>>> Thanks for raising this discussion. I agree that this will
>> > >> be
>> > >>>> an
>> > >>>>>>>>>> interesting feature but I actually have some doubts about
>> > >> the
>> > >>>>>>>> motivation
>> > >>>>>>>>>> and use case. If there are multiple individual subgraphs in
>> > >>> the
>> > >>>>>> same
>> > >>>>>>>> job,
>> > >>>>>>>>>> why not just distribute them to multiple jobs so that each
>> > >>> job
>> > >>>>>>> contains
>> > >>>>>>>>>> only one individual graph and can now fail without
>> > >> disturbing
>> > >>>> the
>> > >>>>>>>> others?
>> > >>>>>>>>>>
>> > >>>>>>>>>> Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
>> > >>>>>>>>>>
>> > >>>>>>>>>>> Hi all!
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> At the moment checkpointing only works for healthy jobs
>> > >>> with
>> > >>>> all
>> > >>>>>>>>> running
>> > >>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most
>> > >>>> cases
>> > >>>>>> but
>> > >>>>>>>>> there
>> > >>>>>>>>>>> are a few applications where it would make sense to
>> > >>>> checkpoint
>> > >>>>>>>> failing
>> > >>>>>>>>>> jobs
>> > >>>>>>>>>>> as well.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs
>> > >>> that
>> > >>>>>> have a
>> > >>>>>>>>>> failing
>> > >>>>>>>>>>> task cannot be checkpointed without violating the
>> > >>>> exactly-once
>> > >>>>>>>>> semantics.
>> > >>>>>>>>>>> However if the job has multiple independent subgraphs
>> > >> (that
>> > >>>> are
>> > >>>>>> not
>> > >>>>>>>>>>> connected to each other), even if one subgraph is
>> > >> failing,
>> > >>>> the
>> > >>>>>>> other
>> > >>>>>>>>>>> completely running one could be checkpointed. In these
>> > >>> cases
>> > >>>> the
>> > >>>>>>>> tasks
>> > >>>>>>>>> of
>> > >>>>>>>>>>> the failing subgraph could simply inherit the last
>> > >>> successful
>> > >>>>>>>>> checkpoint
>> > >>>>>>>>>>> metadata (before they started failing). This logic would
>> > >>>> produce
>> > >>>>>> a
>> > >>>>>>>>>>> consistent checkpoint.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> The job as a whole could now make stateful progress even
>> > >> if
>> > >>>> some
>> > >>>>>>>>>> subgraphs
>> > >>>>>>>>>>> are constantly failing. This can be very valuable if for
>> > >>> some
>> > >>>>>>> reason
>> > >>>>>>>>> the
>> > >>>>>>>>>>> job has a larger number of independent subgraphs that are
>> > >>>>>> expected
>> > >>>>>>> to
>> > >>>>>>>>>> fail
>> > >>>>>>>>>>> every once in a while, or if some subgraphs can have
>> > >> longer
>> > >>>>>>> downtimes
>> > >>>>>>>>>> that
>> > >>>>>>>>>>> would now cause the whole job to stall.
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> What do you think?
>> > >>>>>>>>>>>
>> > >>>>>>>>>>> Cheers,
>> > >>>>>>>>>>> Gyula
>> > >>>>>>>>>>>
>> >
>> >
>>
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by 丛鹏 <co...@gmail.com>.
hi guys,If I understand it correctly, will only some checkpoints be
recovered when there is an error in the Flink batch?

Piotr Nowojski <pn...@apache.org> 于2022年2月8日周二 19:05写道:

> Hi,
>
> I second Chesnay's comment and would like to better understand the
> motivation behind this. At the surface it sounds to me like this might
> require quite a bit of work for a very narrow use case.
>
> At the same time I have a feeling that Yuan, you are mixing this feature
> request (checkpointing subgraphs/pipeline regions independently) and a very
> very different issue of "task local checkpoints"? Those problems are kind
> of similar, but not quite.
>
> Best,
> Piotrek
>
> wt., 8 lut 2022 o 11:44 Chesnay Schepler <ch...@apache.org> napisał(a):
>
> > Could someone expand on these operational issues you're facing when
> > achieving this via separate jobs?
> >
> > I feel like we're skipping a step, arguing about solutions without even
> > having discussed the underlying problem.
> >
> > On 08/02/2022 11:25, Gen Luo wrote:
> > > Hi,
> > >
> > > @Yuan
> > > Do you mean that there should be no shared state between source
> subtasks?
> > > Sharing state between checkpoints of a specific subtask should be fine.
> > >
> > > Sharing state between subtasks of a task can be an issue, no matter
> > whether
> > > it's a source. That's also what I was afraid of in the previous
> replies.
> > In
> > > one word, if the behavior of a pipeline region can somehow influence
> the
> > > state of other pipeline regions, their checkpoints have to be aligned
> > > before rescaling.
> > >
> > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yu...@gmail.com>
> wrote:
> > >
> > >> Hey Folks,
> > >>
> > >> Thanks for the discussion!
> > >>
> > >> *Motiviation and use cases*
> > >> I think motiviation and use cases are very clear and I do not have
> > doubts
> > >> on this part.
> > >> A typical use case is ETL with two-phase-commit, hundreds of
> partitions
> > can
> > >> be blocked by a single straggler (a single task's checkpoint abortion
> > can
> > >> affect all, not necessary failure).
> > >>
> > >> *Source offset redistribution*
> > >> As for the known sources & implementation for Flink, I can not find a
> > case
> > >> that does not work, *for now*.
> > >> I need to dig a bit more: how splits are tracked assigned, not
> > successfully
> > >> processed, succesffully processed e.t.c.
> > >> I guess it is a single shared source OPCoordinator. And how this
> > *shared*
> > >> state (between tasks) is preserved?
> > >>
> > >> *Input partition/splits treated completely independent from each
> other*
> > >> This part I am still not sure, as mentioned if we have shared state of
> > >> source in the above section.
> > >>
> > >> To Thomas:
> > >>> In Yuan's example, is there a reason why CP8 could not be promoted to
> > >>> CP10 by the coordinator for PR2 once it receives the notification
> that
> > >>> CP10 did not complete? It appears that should be possible since in
> its
> > >>> effect it should be no different than no data processed between CP8
> > >>>   and CP10?
> > >> Not sure what "promoted" means here, but
> > >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> > >> if no shared state in source, as exactly what you meantinoed,
> > >> "it should be no different than no data processed between CP8 and
> CP10"
> > >>
> > >> 2. I've noticed that from this question there is a gap between
> > >> "*allow aborted/failed checkpoint in independent sub-graph*" and
> > >> my intention: "*independent sub-graph checkpointing indepently*"
> > >>
> > >> Best
> > >> Yuan
> > >>
> > >>
> > >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <lu...@gmail.com> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I'm thinking about Yuan's case. Let's assume that the case is running
> > in
> > >>> current Flink:
> > >>> 1. CP8 finishes
> > >>> 2. For some reason, PR2 stops consuming records from the source (but
> is
> > >> not
> > >>> stuck), and PR1 continues consuming new records.
> > >>> 3. CP9 and CP10 finish
> > >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches
> the
> > >> same
> > >>> final status with that in Yuan's case before CP11 starts.
> > >>>
> > >>> I support that in this case, the status of the job can be the same as
> > in
> > >>> Yuan's case, and the snapshot (including source states) taken at CP10
> > >>> should be the same as the composed global snapshot in Yuan's case,
> > which
> > >> is
> > >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> > >> failed
> > >>> checkpointing nor uncommitted consuming have side effects, both of
> > which
> > >>> can break the exactly-once semantics when replaying. So I think there
> > >>> should be no difference between rescaling the combined global
> snapshot
> > or
> > >>> the globally taken one, i.e. if the input partitions are not
> > independent,
> > >>> we are probably not able to rescale the source state in the current
> > Flink
> > >>> eiter.
> > >>>
> > >>> And @Thomas, I do agree that the operational burden is
> > >>> significantly reduced, while I'm a little afraid that checkpointing
> the
> > >>> subgraphs individually may increase most of the runtime overhead back
> > >>> again. Maybe we can find a better way to implement this.
> > >>>
> > >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> Thanks for opening this discussion! The proposed enhancement would
> be
> > >>>> interesting for use cases in our infrastructure as well.
> > >>>>
> > >>>> There are scenarios where it makes sense to have multiple
> disconnected
> > >>>> subgraphs in a single job because it can significantly reduce the
> > >>>> operational burden as well as the runtime overhead. Since we allow
> > >>>> subgraphs to recover independently, then why not allow them to make
> > >>>> progress independently also, which would imply that checkpointing
> must
> > >>>> succeed for non affected subgraphs as certain behavior is tied to
> > >>>> checkpoint completion, like Kafka offset commit, file output etc.
> > >>>>
> > >>>> As for source offset redistribution, offset/position needs to be
> tied
> > >>>> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > >>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > >>>> source framework, it would be hard to implement a source with
> correct
> > >>>> behavior that does not track the position along with the split.
> > >>>>
> > >>>> In Yuan's example, is there a reason why CP8 could not be promoted
> to
> > >>>> CP10 by the coordinator for PR2 once it receives the notification
> that
> > >>>> CP10 did not complete? It appears that should be possible since in
> its
> > >>>> effect it should be no different than no data processed between CP8
> > >>>> and CP10?
> > >>>>
> > >>>> Thanks,
> > >>>> Thomas
> > >>>>
> > >>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org>
> > >>> wrote:
> > >>>>> Thanks for the clarification Yuan and Gen,
> > >>>>>
> > >>>>> I agree that the checkpointing of the sources needs to support the
> > >>>>> rescaling case, otherwise it does not work. Is there currently a
> > >> source
> > >>>>> implementation where this wouldn't work? For Kafka it should work
> > >>> because
> > >>>>> we store the offset per assigned partition. For Kinesis it is
> > >> probably
> > >>>> the
> > >>>>> same. For the Filesource we store the set of unread input splits in
> > >> the
> > >>>>> source coordinator and the state of the assigned splits in the
> > >> sources.
> > >>>>> This should probably also work since new splits are only handed out
> > >> to
> > >>>>> running tasks.
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Till
> > >>>>>
> > >>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com>
> > >>> wrote:
> > >>>>>> Hey Till,
> > >>>>>>
> > >>>>>>> Why rescaling is a problem for pipelined regions/independent
> > >>>> execution
> > >>>>>> subgraphs:
> > >>>>>>
> > >>>>>> Take a simplified example :
> > >>>>>> job graph : source  (2 instances) -> sink (2 instances)
> > >>>>>> execution graph:
> > >>>>>> source (1/2)  -> sink (1/2)   [pieplined region 1]
> > >>>>>> source (2/2)  -> sink (2/2)   [pieplined region 2]
> > >>>>>>
> > >>>>>> Let's assume checkpoints are still triggered globally, meaning
> > >>>> different
> > >>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches
> > >>> with
> > >>>> PR2
> > >>>>>> CP1).
> > >>>>>>
> > >>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > >>>>>>
> > >>>>>> Let's say we want to rescale to parallelism 3 due to increased
> > >> input.
> > >>>>>> - Notice that we can not simply rescale based on the latest
> > >> completed
> > >>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> > >>> output
> > >>>>>> externally.
> > >>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends
> on
> > >>>> how the
> > >>>>>> source's offset redistribution is implemented.
> > >>>>>>     The answer is yes if we treat each input partition as
> > >> independent
> > >>>> from
> > >>>>>> each other, *but I am not sure whether we can make that
> > >> assumption*.
> > >>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned
> > >>> with
> > >>>> CPs.
> > >>>>>> Best
> > >>>>>> -Yuan
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <
> trohrmann@apache.org
> > >>>> wrote:
> > >>>>>>> Hi everyone,
> > >>>>>>>
> > >>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if we
> > >>> say
> > >>>>>> that
> > >>>>>>> separate pipelined regions can take checkpoints independently?
> > >>>>>>> Conceptually, I somehow think that a pipelined region that is
> > >>> failed
> > >>>> and
> > >>>>>>> cannot create a new checkpoint is more or less the same as a
> > >>>> pipelined
> > >>>>>>> region that didn't get new input or a very very slow pipelined
> > >>> region
> > >>>>>> which
> > >>>>>>> couldn't read new records since the last checkpoint (assuming
> > >> that
> > >>>> the
> > >>>>>>> checkpoint coordinator can create a global checkpoint by
> > >> combining
> > >>>>>>> individual checkpoints (e.g. taking the last completed checkpoint
> > >>>> from
> > >>>>>> each
> > >>>>>>> pipelined region)). If this comparison is correct, then this
> > >> would
> > >>>> mean
> > >>>>>>> that we have rescaling problems under the latter two cases.
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Till
> > >>>>>>>
> > >>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com>
> > >>> wrote:
> > >>>>>>>> Hi Gyula,
> > >>>>>>>>
> > >>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can
> > >>>> discuss
> > >>>>>>> this
> > >>>>>>>> within two scopes. One is the job subgraph, the other is the
> > >>>> execution
> > >>>>>>>> subgraph, which I suppose is the same as PipelineRegion.
> > >>>>>>>>
> > >>>>>>>> An idea is to individually checkpoint the PipelineRegions, for
> > >>> the
> > >>>>>>>> recovering in a single run.
> > >>>>>>>>
> > >>>>>>>> Flink has now supported PipelineRegion based failover, with a
> > >>>> subset
> > >>>>>> of a
> > >>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread
> > >>>> within a
> > >>>>>>>> PipelineRegion, so the checkpointing of individual
> > >>> PipelineRegions
> > >>>> is
> > >>>>>>>> actually independent. Since in a single run of a job, the
> > >>>>>> PipelineRegions
> > >>>>>>>> are fixed, we can individually checkpoint separated
> > >>>> PipelineRegions,
> > >>>>>>>> despite what status the other PipelineRegions are, and use a
> > >>>> snapshot
> > >>>>>> of
> > >>>>>>> a
> > >>>>>>>> failing region to recover, instead of the subset of a global
> > >>>> snapshot.
> > >>>>>>> This
> > >>>>>>>> can support separated job subgraphs as well, since they will
> > >> also
> > >>>> be
> > >>>>>>>> separated into different PipelineRegions. I think this can
> > >>> fulfill
> > >>>> your
> > >>>>>>>> needs.
> > >>>>>>>>
> > >>>>>>>> In fact the individual snapshots of all PipelineRegions can
> > >> form
> > >>> a
> > >>>>>> global
> > >>>>>>>> snapshot, and the alignment of snapshots of individual regions
> > >> is
> > >>>> not
> > >>>>>>>> necessary. But rescaling this global snapshot can be
> > >> potentially
> > >>>>>>> complex. I
> > >>>>>>>> think it's better to use the individual snapshots in a single
> > >>> run,
> > >>>> and
> > >>>>>>> take
> > >>>>>>>> a global checkpoint/savepoint before restarting the job,
> > >>> rescaling
> > >>>> it
> > >>>>>> or
> > >>>>>>>> not.
> > >>>>>>>>
> > >>>>>>>> A major issue of this plan is that it breaks the checkpoint
> > >>>> mechanism
> > >>>>>> of
> > >>>>>>>> Flink. As far as I know, even in the approximate recovery, the
> > >>>> snapshot
> > >>>>>>>> used to recover a single task is still a part of a global
> > >>>> snapshot. To
> > >>>>>>>> implement the individual checkpointing of PipelineRegions,
> > >> there
> > >>>> may
> > >>>>>> need
> > >>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a
> > >> new
> > >>>>>> global
> > >>>>>>>> checkpoint coordinator. When the scale goes up, there can be
> > >> many
> > >>>>>>>> individual regions, which can be a big burden to the job
> > >> manager.
> > >>>> The
> > >>>>>>>> meaning of the checkpoint id will also be changed, which can
> > >>> affect
> > >>>>>> many
> > >>>>>>>> aspects. There can be lots of work and risks, and the risks
> > >> still
> > >>>> exist
> > >>>>>>> if
> > >>>>>>>> we only individually checkpoint separated job subgraphs, since
> > >>> the
> > >>>>>>>> mechanism is still broken. If that is what you need, maybe
> > >>>> separating
> > >>>>>>> them
> > >>>>>>>> into different jobs is an easier and better choice, as Caizhi
> > >> and
> > >>>> Yuan
> > >>>>>>>> mentioned.
> > >>>>>>>>
> > >>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
> > >> yuanmei.work@gmail.com
> > >>>>>> wrote:
> > >>>>>>>>> Hey Gyula,
> > >>>>>>>>>
> > >>>>>>>>> That's a very interesting idea. The discussion about the
> > >>>> `Individual`
> > >>>>>>> vs
> > >>>>>>>>> `Global` checkpoint was raised before, but the main concern
> > >> was
> > >>>> from
> > >>>>>>> two
> > >>>>>>>>> aspects:
> > >>>>>>>>>
> > >>>>>>>>> - Non-deterministic replaying may lead to an inconsistent
> > >> view
> > >>> of
> > >>>>>>>>> checkpoint
> > >>>>>>>>> - It is not easy to form a clear cut of past and future and
> > >>>> hence no
> > >>>>>>>> clear
> > >>>>>>>>> cut of where the start point of the source should begin to
> > >>> replay
> > >>>>>> from.
> > >>>>>>>>> Starting from independent subgraphs as you proposed may be a
> > >>> good
> > >>>>>>>> starting
> > >>>>>>>>> point. However, when we talk about subgraph, do we mention it
> > >>> as
> > >>>> a
> > >>>>>> job
> > >>>>>>>>> subgraph (each vertex is one or more operators) or execution
> > >>>> subgraph
> > >>>>>>>> (each
> > >>>>>>>>> vertex is a task instance)?
> > >>>>>>>>>
> > >>>>>>>>> If it is a job subgraph, then indeed, why not separate it
> > >> into
> > >>>>>> multiple
> > >>>>>>>>> jobs as Caizhi mentioned.
> > >>>>>>>>> If it is an execution subgraph, then it is difficult to
> > >> handle
> > >>>>>>> rescaling
> > >>>>>>>>> due to inconsistent views of checkpoints between tasks of the
> > >>>> same
> > >>>>>>>>> operator.
> > >>>>>>>>>
> > >>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an
> > >>> interesting
> > >>>>>>>> direction
> > >>>>>>>>> to think of, and I'd love to hear more from you!
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>>
> > >>>>>>>>> Yuan
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> > >>>> tsreaper96@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>>> Hi Gyula!
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for raising this discussion. I agree that this will
> > >> be
> > >>>> an
> > >>>>>>>>>> interesting feature but I actually have some doubts about
> > >> the
> > >>>>>>>> motivation
> > >>>>>>>>>> and use case. If there are multiple individual subgraphs in
> > >>> the
> > >>>>>> same
> > >>>>>>>> job,
> > >>>>>>>>>> why not just distribute them to multiple jobs so that each
> > >>> job
> > >>>>>>> contains
> > >>>>>>>>>> only one individual graph and can now fail without
> > >> disturbing
> > >>>> the
> > >>>>>>>> others?
> > >>>>>>>>>>
> > >>>>>>>>>> Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi all!
> > >>>>>>>>>>>
> > >>>>>>>>>>> At the moment checkpointing only works for healthy jobs
> > >>> with
> > >>>> all
> > >>>>>>>>> running
> > >>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most
> > >>>> cases
> > >>>>>> but
> > >>>>>>>>> there
> > >>>>>>>>>>> are a few applications where it would make sense to
> > >>>> checkpoint
> > >>>>>>>> failing
> > >>>>>>>>>> jobs
> > >>>>>>>>>>> as well.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs
> > >>> that
> > >>>>>> have a
> > >>>>>>>>>> failing
> > >>>>>>>>>>> task cannot be checkpointed without violating the
> > >>>> exactly-once
> > >>>>>>>>> semantics.
> > >>>>>>>>>>> However if the job has multiple independent subgraphs
> > >> (that
> > >>>> are
> > >>>>>> not
> > >>>>>>>>>>> connected to each other), even if one subgraph is
> > >> failing,
> > >>>> the
> > >>>>>>> other
> > >>>>>>>>>>> completely running one could be checkpointed. In these
> > >>> cases
> > >>>> the
> > >>>>>>>> tasks
> > >>>>>>>>> of
> > >>>>>>>>>>> the failing subgraph could simply inherit the last
> > >>> successful
> > >>>>>>>>> checkpoint
> > >>>>>>>>>>> metadata (before they started failing). This logic would
> > >>>> produce
> > >>>>>> a
> > >>>>>>>>>>> consistent checkpoint.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The job as a whole could now make stateful progress even
> > >> if
> > >>>> some
> > >>>>>>>>>> subgraphs
> > >>>>>>>>>>> are constantly failing. This can be very valuable if for
> > >>> some
> > >>>>>>> reason
> > >>>>>>>>> the
> > >>>>>>>>>>> job has a larger number of independent subgraphs that are
> > >>>>>> expected
> > >>>>>>> to
> > >>>>>>>>>> fail
> > >>>>>>>>>>> every once in a while, or if some subgraphs can have
> > >> longer
> > >>>>>>> downtimes
> > >>>>>>>>>> that
> > >>>>>>>>>>> would now cause the whole job to stall.
> > >>>>>>>>>>>
> > >>>>>>>>>>> What do you think?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Gyula
> > >>>>>>>>>>>
> >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Piotr Nowojski <pn...@apache.org>.
Hi,

I second Chesnay's comment and would like to better understand the
motivation behind this. At the surface it sounds to me like this might
require quite a bit of work for a very narrow use case.

At the same time I have a feeling that Yuan, you are mixing this feature
request (checkpointing subgraphs/pipeline regions independently) and a very
very different issue of "task local checkpoints"? Those problems are kind
of similar, but not quite.

Best,
Piotrek

wt., 8 lut 2022 o 11:44 Chesnay Schepler <ch...@apache.org> napisał(a):

> Could someone expand on these operational issues you're facing when
> achieving this via separate jobs?
>
> I feel like we're skipping a step, arguing about solutions without even
> having discussed the underlying problem.
>
> On 08/02/2022 11:25, Gen Luo wrote:
> > Hi,
> >
> > @Yuan
> > Do you mean that there should be no shared state between source subtasks?
> > Sharing state between checkpoints of a specific subtask should be fine.
> >
> > Sharing state between subtasks of a task can be an issue, no matter
> whether
> > it's a source. That's also what I was afraid of in the previous replies.
> In
> > one word, if the behavior of a pipeline region can somehow influence the
> > state of other pipeline regions, their checkpoints have to be aligned
> > before rescaling.
> >
> > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yu...@gmail.com> wrote:
> >
> >> Hey Folks,
> >>
> >> Thanks for the discussion!
> >>
> >> *Motiviation and use cases*
> >> I think motiviation and use cases are very clear and I do not have
> doubts
> >> on this part.
> >> A typical use case is ETL with two-phase-commit, hundreds of partitions
> can
> >> be blocked by a single straggler (a single task's checkpoint abortion
> can
> >> affect all, not necessary failure).
> >>
> >> *Source offset redistribution*
> >> As for the known sources & implementation for Flink, I can not find a
> case
> >> that does not work, *for now*.
> >> I need to dig a bit more: how splits are tracked assigned, not
> successfully
> >> processed, succesffully processed e.t.c.
> >> I guess it is a single shared source OPCoordinator. And how this
> *shared*
> >> state (between tasks) is preserved?
> >>
> >> *Input partition/splits treated completely independent from each other*
> >> This part I am still not sure, as mentioned if we have shared state of
> >> source in the above section.
> >>
> >> To Thomas:
> >>> In Yuan's example, is there a reason why CP8 could not be promoted to
> >>> CP10 by the coordinator for PR2 once it receives the notification that
> >>> CP10 did not complete? It appears that should be possible since in its
> >>> effect it should be no different than no data processed between CP8
> >>>   and CP10?
> >> Not sure what "promoted" means here, but
> >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> >> if no shared state in source, as exactly what you meantinoed,
> >> "it should be no different than no data processed between CP8 and CP10"
> >>
> >> 2. I've noticed that from this question there is a gap between
> >> "*allow aborted/failed checkpoint in independent sub-graph*" and
> >> my intention: "*independent sub-graph checkpointing indepently*"
> >>
> >> Best
> >> Yuan
> >>
> >>
> >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <lu...@gmail.com> wrote:
> >>
> >>> Hi,
> >>>
> >>> I'm thinking about Yuan's case. Let's assume that the case is running
> in
> >>> current Flink:
> >>> 1. CP8 finishes
> >>> 2. For some reason, PR2 stops consuming records from the source (but is
> >> not
> >>> stuck), and PR1 continues consuming new records.
> >>> 3. CP9 and CP10 finish
> >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> >> same
> >>> final status with that in Yuan's case before CP11 starts.
> >>>
> >>> I support that in this case, the status of the job can be the same as
> in
> >>> Yuan's case, and the snapshot (including source states) taken at CP10
> >>> should be the same as the composed global snapshot in Yuan's case,
> which
> >> is
> >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> >> failed
> >>> checkpointing nor uncommitted consuming have side effects, both of
> which
> >>> can break the exactly-once semantics when replaying. So I think there
> >>> should be no difference between rescaling the combined global snapshot
> or
> >>> the globally taken one, i.e. if the input partitions are not
> independent,
> >>> we are probably not able to rescale the source state in the current
> Flink
> >>> eiter.
> >>>
> >>> And @Thomas, I do agree that the operational burden is
> >>> significantly reduced, while I'm a little afraid that checkpointing the
> >>> subgraphs individually may increase most of the runtime overhead back
> >>> again. Maybe we can find a better way to implement this.
> >>>
> >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> Thanks for opening this discussion! The proposed enhancement would be
> >>>> interesting for use cases in our infrastructure as well.
> >>>>
> >>>> There are scenarios where it makes sense to have multiple disconnected
> >>>> subgraphs in a single job because it can significantly reduce the
> >>>> operational burden as well as the runtime overhead. Since we allow
> >>>> subgraphs to recover independently, then why not allow them to make
> >>>> progress independently also, which would imply that checkpointing must
> >>>> succeed for non affected subgraphs as certain behavior is tied to
> >>>> checkpoint completion, like Kafka offset commit, file output etc.
> >>>>
> >>>> As for source offset redistribution, offset/position needs to be tied
> >>>> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> >>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> >>>> source framework, it would be hard to implement a source with correct
> >>>> behavior that does not track the position along with the split.
> >>>>
> >>>> In Yuan's example, is there a reason why CP8 could not be promoted to
> >>>> CP10 by the coordinator for PR2 once it receives the notification that
> >>>> CP10 did not complete? It appears that should be possible since in its
> >>>> effect it should be no different than no data processed between CP8
> >>>> and CP10?
> >>>>
> >>>> Thanks,
> >>>> Thomas
> >>>>
> >>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org>
> >>> wrote:
> >>>>> Thanks for the clarification Yuan and Gen,
> >>>>>
> >>>>> I agree that the checkpointing of the sources needs to support the
> >>>>> rescaling case, otherwise it does not work. Is there currently a
> >> source
> >>>>> implementation where this wouldn't work? For Kafka it should work
> >>> because
> >>>>> we store the offset per assigned partition. For Kinesis it is
> >> probably
> >>>> the
> >>>>> same. For the Filesource we store the set of unread input splits in
> >> the
> >>>>> source coordinator and the state of the assigned splits in the
> >> sources.
> >>>>> This should probably also work since new splits are only handed out
> >> to
> >>>>> running tasks.
> >>>>>
> >>>>> Cheers,
> >>>>> Till
> >>>>>
> >>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com>
> >>> wrote:
> >>>>>> Hey Till,
> >>>>>>
> >>>>>>> Why rescaling is a problem for pipelined regions/independent
> >>>> execution
> >>>>>> subgraphs:
> >>>>>>
> >>>>>> Take a simplified example :
> >>>>>> job graph : source  (2 instances) -> sink (2 instances)
> >>>>>> execution graph:
> >>>>>> source (1/2)  -> sink (1/2)   [pieplined region 1]
> >>>>>> source (2/2)  -> sink (2/2)   [pieplined region 2]
> >>>>>>
> >>>>>> Let's assume checkpoints are still triggered globally, meaning
> >>>> different
> >>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches
> >>> with
> >>>> PR2
> >>>>>> CP1).
> >>>>>>
> >>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8.
> >>>>>>
> >>>>>> Let's say we want to rescale to parallelism 3 due to increased
> >> input.
> >>>>>> - Notice that we can not simply rescale based on the latest
> >> completed
> >>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> >>> output
> >>>>>> externally.
> >>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> >>>> how the
> >>>>>> source's offset redistribution is implemented.
> >>>>>>     The answer is yes if we treat each input partition as
> >> independent
> >>>> from
> >>>>>> each other, *but I am not sure whether we can make that
> >> assumption*.
> >>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned
> >>> with
> >>>> CPs.
> >>>>>> Best
> >>>>>> -Yuan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrmann@apache.org
> >>>> wrote:
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if we
> >>> say
> >>>>>> that
> >>>>>>> separate pipelined regions can take checkpoints independently?
> >>>>>>> Conceptually, I somehow think that a pipelined region that is
> >>> failed
> >>>> and
> >>>>>>> cannot create a new checkpoint is more or less the same as a
> >>>> pipelined
> >>>>>>> region that didn't get new input or a very very slow pipelined
> >>> region
> >>>>>> which
> >>>>>>> couldn't read new records since the last checkpoint (assuming
> >> that
> >>>> the
> >>>>>>> checkpoint coordinator can create a global checkpoint by
> >> combining
> >>>>>>> individual checkpoints (e.g. taking the last completed checkpoint
> >>>> from
> >>>>>> each
> >>>>>>> pipelined region)). If this comparison is correct, then this
> >> would
> >>>> mean
> >>>>>>> that we have rescaling problems under the latter two cases.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Till
> >>>>>>>
> >>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com>
> >>> wrote:
> >>>>>>>> Hi Gyula,
> >>>>>>>>
> >>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can
> >>>> discuss
> >>>>>>> this
> >>>>>>>> within two scopes. One is the job subgraph, the other is the
> >>>> execution
> >>>>>>>> subgraph, which I suppose is the same as PipelineRegion.
> >>>>>>>>
> >>>>>>>> An idea is to individually checkpoint the PipelineRegions, for
> >>> the
> >>>>>>>> recovering in a single run.
> >>>>>>>>
> >>>>>>>> Flink has now supported PipelineRegion based failover, with a
> >>>> subset
> >>>>>> of a
> >>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread
> >>>> within a
> >>>>>>>> PipelineRegion, so the checkpointing of individual
> >>> PipelineRegions
> >>>> is
> >>>>>>>> actually independent. Since in a single run of a job, the
> >>>>>> PipelineRegions
> >>>>>>>> are fixed, we can individually checkpoint separated
> >>>> PipelineRegions,
> >>>>>>>> despite what status the other PipelineRegions are, and use a
> >>>> snapshot
> >>>>>> of
> >>>>>>> a
> >>>>>>>> failing region to recover, instead of the subset of a global
> >>>> snapshot.
> >>>>>>> This
> >>>>>>>> can support separated job subgraphs as well, since they will
> >> also
> >>>> be
> >>>>>>>> separated into different PipelineRegions. I think this can
> >>> fulfill
> >>>> your
> >>>>>>>> needs.
> >>>>>>>>
> >>>>>>>> In fact the individual snapshots of all PipelineRegions can
> >> form
> >>> a
> >>>>>> global
> >>>>>>>> snapshot, and the alignment of snapshots of individual regions
> >> is
> >>>> not
> >>>>>>>> necessary. But rescaling this global snapshot can be
> >> potentially
> >>>>>>> complex. I
> >>>>>>>> think it's better to use the individual snapshots in a single
> >>> run,
> >>>> and
> >>>>>>> take
> >>>>>>>> a global checkpoint/savepoint before restarting the job,
> >>> rescaling
> >>>> it
> >>>>>> or
> >>>>>>>> not.
> >>>>>>>>
> >>>>>>>> A major issue of this plan is that it breaks the checkpoint
> >>>> mechanism
> >>>>>> of
> >>>>>>>> Flink. As far as I know, even in the approximate recovery, the
> >>>> snapshot
> >>>>>>>> used to recover a single task is still a part of a global
> >>>> snapshot. To
> >>>>>>>> implement the individual checkpointing of PipelineRegions,
> >> there
> >>>> may
> >>>>>> need
> >>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a
> >> new
> >>>>>> global
> >>>>>>>> checkpoint coordinator. When the scale goes up, there can be
> >> many
> >>>>>>>> individual regions, which can be a big burden to the job
> >> manager.
> >>>> The
> >>>>>>>> meaning of the checkpoint id will also be changed, which can
> >>> affect
> >>>>>> many
> >>>>>>>> aspects. There can be lots of work and risks, and the risks
> >> still
> >>>> exist
> >>>>>>> if
> >>>>>>>> we only individually checkpoint separated job subgraphs, since
> >>> the
> >>>>>>>> mechanism is still broken. If that is what you need, maybe
> >>>> separating
> >>>>>>> them
> >>>>>>>> into different jobs is an easier and better choice, as Caizhi
> >> and
> >>>> Yuan
> >>>>>>>> mentioned.
> >>>>>>>>
> >>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
> >> yuanmei.work@gmail.com
> >>>>>> wrote:
> >>>>>>>>> Hey Gyula,
> >>>>>>>>>
> >>>>>>>>> That's a very interesting idea. The discussion about the
> >>>> `Individual`
> >>>>>>> vs
> >>>>>>>>> `Global` checkpoint was raised before, but the main concern
> >> was
> >>>> from
> >>>>>>> two
> >>>>>>>>> aspects:
> >>>>>>>>>
> >>>>>>>>> - Non-deterministic replaying may lead to an inconsistent
> >> view
> >>> of
> >>>>>>>>> checkpoint
> >>>>>>>>> - It is not easy to form a clear cut of past and future and
> >>>> hence no
> >>>>>>>> clear
> >>>>>>>>> cut of where the start point of the source should begin to
> >>> replay
> >>>>>> from.
> >>>>>>>>> Starting from independent subgraphs as you proposed may be a
> >>> good
> >>>>>>>> starting
> >>>>>>>>> point. However, when we talk about subgraph, do we mention it
> >>> as
> >>>> a
> >>>>>> job
> >>>>>>>>> subgraph (each vertex is one or more operators) or execution
> >>>> subgraph
> >>>>>>>> (each
> >>>>>>>>> vertex is a task instance)?
> >>>>>>>>>
> >>>>>>>>> If it is a job subgraph, then indeed, why not separate it
> >> into
> >>>>>> multiple
> >>>>>>>>> jobs as Caizhi mentioned.
> >>>>>>>>> If it is an execution subgraph, then it is difficult to
> >> handle
> >>>>>>> rescaling
> >>>>>>>>> due to inconsistent views of checkpoints between tasks of the
> >>>> same
> >>>>>>>>> operator.
> >>>>>>>>>
> >>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an
> >>> interesting
> >>>>>>>> direction
> >>>>>>>>> to think of, and I'd love to hear more from you!
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>>
> >>>>>>>>> Yuan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> >>>> tsreaper96@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>> Hi Gyula!
> >>>>>>>>>>
> >>>>>>>>>> Thanks for raising this discussion. I agree that this will
> >> be
> >>>> an
> >>>>>>>>>> interesting feature but I actually have some doubts about
> >> the
> >>>>>>>> motivation
> >>>>>>>>>> and use case. If there are multiple individual subgraphs in
> >>> the
> >>>>>> same
> >>>>>>>> job,
> >>>>>>>>>> why not just distribute them to multiple jobs so that each
> >>> job
> >>>>>>> contains
> >>>>>>>>>> only one individual graph and can now fail without
> >> disturbing
> >>>> the
> >>>>>>>> others?
> >>>>>>>>>>
> >>>>>>>>>> Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all!
> >>>>>>>>>>>
> >>>>>>>>>>> At the moment checkpointing only works for healthy jobs
> >>> with
> >>>> all
> >>>>>>>>> running
> >>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most
> >>>> cases
> >>>>>> but
> >>>>>>>>> there
> >>>>>>>>>>> are a few applications where it would make sense to
> >>>> checkpoint
> >>>>>>>> failing
> >>>>>>>>>> jobs
> >>>>>>>>>>> as well.
> >>>>>>>>>>>
> >>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs
> >>> that
> >>>>>> have a
> >>>>>>>>>> failing
> >>>>>>>>>>> task cannot be checkpointed without violating the
> >>>> exactly-once
> >>>>>>>>> semantics.
> >>>>>>>>>>> However if the job has multiple independent subgraphs
> >> (that
> >>>> are
> >>>>>> not
> >>>>>>>>>>> connected to each other), even if one subgraph is
> >> failing,
> >>>> the
> >>>>>>> other
> >>>>>>>>>>> completely running one could be checkpointed. In these
> >>> cases
> >>>> the
> >>>>>>>> tasks
> >>>>>>>>> of
> >>>>>>>>>>> the failing subgraph could simply inherit the last
> >>> successful
> >>>>>>>>> checkpoint
> >>>>>>>>>>> metadata (before they started failing). This logic would
> >>>> produce
> >>>>>> a
> >>>>>>>>>>> consistent checkpoint.
> >>>>>>>>>>>
> >>>>>>>>>>> The job as a whole could now make stateful progress even
> >> if
> >>>> some
> >>>>>>>>>> subgraphs
> >>>>>>>>>>> are constantly failing. This can be very valuable if for
> >>> some
> >>>>>>> reason
> >>>>>>>>> the
> >>>>>>>>>>> job has a larger number of independent subgraphs that are
> >>>>>> expected
> >>>>>>> to
> >>>>>>>>>> fail
> >>>>>>>>>>> every once in a while, or if some subgraphs can have
> >> longer
> >>>>>>> downtimes
> >>>>>>>>>> that
> >>>>>>>>>>> would now cause the whole job to stall.
> >>>>>>>>>>>
> >>>>>>>>>>> What do you think?
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Gyula
> >>>>>>>>>>>
>
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Chesnay Schepler <ch...@apache.org>.
Could someone expand on these operational issues you're facing when 
achieving this via separate jobs?

I feel like we're skipping a step, arguing about solutions without even 
having discussed the underlying problem.

On 08/02/2022 11:25, Gen Luo wrote:
> Hi,
>
> @Yuan
> Do you mean that there should be no shared state between source subtasks?
> Sharing state between checkpoints of a specific subtask should be fine.
>
> Sharing state between subtasks of a task can be an issue, no matter whether
> it's a source. That's also what I was afraid of in the previous replies. In
> one word, if the behavior of a pipeline region can somehow influence the
> state of other pipeline regions, their checkpoints have to be aligned
> before rescaling.
>
> On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yu...@gmail.com> wrote:
>
>> Hey Folks,
>>
>> Thanks for the discussion!
>>
>> *Motiviation and use cases*
>> I think motiviation and use cases are very clear and I do not have doubts
>> on this part.
>> A typical use case is ETL with two-phase-commit, hundreds of partitions can
>> be blocked by a single straggler (a single task's checkpoint abortion can
>> affect all, not necessary failure).
>>
>> *Source offset redistribution*
>> As for the known sources & implementation for Flink, I can not find a case
>> that does not work, *for now*.
>> I need to dig a bit more: how splits are tracked assigned, not successfully
>> processed, succesffully processed e.t.c.
>> I guess it is a single shared source OPCoordinator. And how this *shared*
>> state (between tasks) is preserved?
>>
>> *Input partition/splits treated completely independent from each other*
>> This part I am still not sure, as mentioned if we have shared state of
>> source in the above section.
>>
>> To Thomas:
>>> In Yuan's example, is there a reason why CP8 could not be promoted to
>>> CP10 by the coordinator for PR2 once it receives the notification that
>>> CP10 did not complete? It appears that should be possible since in its
>>> effect it should be no different than no data processed between CP8
>>>   and CP10?
>> Not sure what "promoted" means here, but
>> 1. I guess it does not matter whether it is CP8 or CP10 any more,
>> if no shared state in source, as exactly what you meantinoed,
>> "it should be no different than no data processed between CP8 and CP10"
>>
>> 2. I've noticed that from this question there is a gap between
>> "*allow aborted/failed checkpoint in independent sub-graph*" and
>> my intention: "*independent sub-graph checkpointing indepently*"
>>
>> Best
>> Yuan
>>
>>
>> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <lu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm thinking about Yuan's case. Let's assume that the case is running in
>>> current Flink:
>>> 1. CP8 finishes
>>> 2. For some reason, PR2 stops consuming records from the source (but is
>> not
>>> stuck), and PR1 continues consuming new records.
>>> 3. CP9 and CP10 finish
>>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
>> same
>>> final status with that in Yuan's case before CP11 starts.
>>>
>>> I support that in this case, the status of the job can be the same as in
>>> Yuan's case, and the snapshot (including source states) taken at CP10
>>> should be the same as the composed global snapshot in Yuan's case, which
>> is
>>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
>> failed
>>> checkpointing nor uncommitted consuming have side effects, both of which
>>> can break the exactly-once semantics when replaying. So I think there
>>> should be no difference between rescaling the combined global snapshot or
>>> the globally taken one, i.e. if the input partitions are not independent,
>>> we are probably not able to rescale the source state in the current Flink
>>> eiter.
>>>
>>> And @Thomas, I do agree that the operational burden is
>>> significantly reduced, while I'm a little afraid that checkpointing the
>>> subgraphs individually may increase most of the runtime overhead back
>>> again. Maybe we can find a better way to implement this.
>>>
>>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks for opening this discussion! The proposed enhancement would be
>>>> interesting for use cases in our infrastructure as well.
>>>>
>>>> There are scenarios where it makes sense to have multiple disconnected
>>>> subgraphs in a single job because it can significantly reduce the
>>>> operational burden as well as the runtime overhead. Since we allow
>>>> subgraphs to recover independently, then why not allow them to make
>>>> progress independently also, which would imply that checkpointing must
>>>> succeed for non affected subgraphs as certain behavior is tied to
>>>> checkpoint completion, like Kafka offset commit, file output etc.
>>>>
>>>> As for source offset redistribution, offset/position needs to be tied
>>>> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
>>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
>>>> source framework, it would be hard to implement a source with correct
>>>> behavior that does not track the position along with the split.
>>>>
>>>> In Yuan's example, is there a reason why CP8 could not be promoted to
>>>> CP10 by the coordinator for PR2 once it receives the notification that
>>>> CP10 did not complete? It appears that should be possible since in its
>>>> effect it should be no different than no data processed between CP8
>>>> and CP10?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>>> Thanks for the clarification Yuan and Gen,
>>>>>
>>>>> I agree that the checkpointing of the sources needs to support the
>>>>> rescaling case, otherwise it does not work. Is there currently a
>> source
>>>>> implementation where this wouldn't work? For Kafka it should work
>>> because
>>>>> we store the offset per assigned partition. For Kinesis it is
>> probably
>>>> the
>>>>> same. For the Filesource we store the set of unread input splits in
>> the
>>>>> source coordinator and the state of the assigned splits in the
>> sources.
>>>>> This should probably also work since new splits are only handed out
>> to
>>>>> running tasks.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com>
>>> wrote:
>>>>>> Hey Till,
>>>>>>
>>>>>>> Why rescaling is a problem for pipelined regions/independent
>>>> execution
>>>>>> subgraphs:
>>>>>>
>>>>>> Take a simplified example :
>>>>>> job graph : source  (2 instances) -> sink (2 instances)
>>>>>> execution graph:
>>>>>> source (1/2)  -> sink (1/2)   [pieplined region 1]
>>>>>> source (2/2)  -> sink (2/2)   [pieplined region 2]
>>>>>>
>>>>>> Let's assume checkpoints are still triggered globally, meaning
>>>> different
>>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches
>>> with
>>>> PR2
>>>>>> CP1).
>>>>>>
>>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8.
>>>>>>
>>>>>> Let's say we want to rescale to parallelism 3 due to increased
>> input.
>>>>>> - Notice that we can not simply rescale based on the latest
>> completed
>>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
>>> output
>>>>>> externally.
>>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
>>>> how the
>>>>>> source's offset redistribution is implemented.
>>>>>>     The answer is yes if we treat each input partition as
>> independent
>>>> from
>>>>>> each other, *but I am not sure whether we can make that
>> assumption*.
>>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned
>>> with
>>>> CPs.
>>>>>> Best
>>>>>> -Yuan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrmann@apache.org
>>>> wrote:
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if we
>>> say
>>>>>> that
>>>>>>> separate pipelined regions can take checkpoints independently?
>>>>>>> Conceptually, I somehow think that a pipelined region that is
>>> failed
>>>> and
>>>>>>> cannot create a new checkpoint is more or less the same as a
>>>> pipelined
>>>>>>> region that didn't get new input or a very very slow pipelined
>>> region
>>>>>> which
>>>>>>> couldn't read new records since the last checkpoint (assuming
>> that
>>>> the
>>>>>>> checkpoint coordinator can create a global checkpoint by
>> combining
>>>>>>> individual checkpoints (e.g. taking the last completed checkpoint
>>>> from
>>>>>> each
>>>>>>> pipelined region)). If this comparison is correct, then this
>> would
>>>> mean
>>>>>>> that we have rescaling problems under the latter two cases.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com>
>>> wrote:
>>>>>>>> Hi Gyula,
>>>>>>>>
>>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can
>>>> discuss
>>>>>>> this
>>>>>>>> within two scopes. One is the job subgraph, the other is the
>>>> execution
>>>>>>>> subgraph, which I suppose is the same as PipelineRegion.
>>>>>>>>
>>>>>>>> An idea is to individually checkpoint the PipelineRegions, for
>>> the
>>>>>>>> recovering in a single run.
>>>>>>>>
>>>>>>>> Flink has now supported PipelineRegion based failover, with a
>>>> subset
>>>>>> of a
>>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread
>>>> within a
>>>>>>>> PipelineRegion, so the checkpointing of individual
>>> PipelineRegions
>>>> is
>>>>>>>> actually independent. Since in a single run of a job, the
>>>>>> PipelineRegions
>>>>>>>> are fixed, we can individually checkpoint separated
>>>> PipelineRegions,
>>>>>>>> despite what status the other PipelineRegions are, and use a
>>>> snapshot
>>>>>> of
>>>>>>> a
>>>>>>>> failing region to recover, instead of the subset of a global
>>>> snapshot.
>>>>>>> This
>>>>>>>> can support separated job subgraphs as well, since they will
>> also
>>>> be
>>>>>>>> separated into different PipelineRegions. I think this can
>>> fulfill
>>>> your
>>>>>>>> needs.
>>>>>>>>
>>>>>>>> In fact the individual snapshots of all PipelineRegions can
>> form
>>> a
>>>>>> global
>>>>>>>> snapshot, and the alignment of snapshots of individual regions
>> is
>>>> not
>>>>>>>> necessary. But rescaling this global snapshot can be
>> potentially
>>>>>>> complex. I
>>>>>>>> think it's better to use the individual snapshots in a single
>>> run,
>>>> and
>>>>>>> take
>>>>>>>> a global checkpoint/savepoint before restarting the job,
>>> rescaling
>>>> it
>>>>>> or
>>>>>>>> not.
>>>>>>>>
>>>>>>>> A major issue of this plan is that it breaks the checkpoint
>>>> mechanism
>>>>>> of
>>>>>>>> Flink. As far as I know, even in the approximate recovery, the
>>>> snapshot
>>>>>>>> used to recover a single task is still a part of a global
>>>> snapshot. To
>>>>>>>> implement the individual checkpointing of PipelineRegions,
>> there
>>>> may
>>>>>> need
>>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a
>> new
>>>>>> global
>>>>>>>> checkpoint coordinator. When the scale goes up, there can be
>> many
>>>>>>>> individual regions, which can be a big burden to the job
>> manager.
>>>> The
>>>>>>>> meaning of the checkpoint id will also be changed, which can
>>> affect
>>>>>> many
>>>>>>>> aspects. There can be lots of work and risks, and the risks
>> still
>>>> exist
>>>>>>> if
>>>>>>>> we only individually checkpoint separated job subgraphs, since
>>> the
>>>>>>>> mechanism is still broken. If that is what you need, maybe
>>>> separating
>>>>>>> them
>>>>>>>> into different jobs is an easier and better choice, as Caizhi
>> and
>>>> Yuan
>>>>>>>> mentioned.
>>>>>>>>
>>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
>> yuanmei.work@gmail.com
>>>>>> wrote:
>>>>>>>>> Hey Gyula,
>>>>>>>>>
>>>>>>>>> That's a very interesting idea. The discussion about the
>>>> `Individual`
>>>>>>> vs
>>>>>>>>> `Global` checkpoint was raised before, but the main concern
>> was
>>>> from
>>>>>>> two
>>>>>>>>> aspects:
>>>>>>>>>
>>>>>>>>> - Non-deterministic replaying may lead to an inconsistent
>> view
>>> of
>>>>>>>>> checkpoint
>>>>>>>>> - It is not easy to form a clear cut of past and future and
>>>> hence no
>>>>>>>> clear
>>>>>>>>> cut of where the start point of the source should begin to
>>> replay
>>>>>> from.
>>>>>>>>> Starting from independent subgraphs as you proposed may be a
>>> good
>>>>>>>> starting
>>>>>>>>> point. However, when we talk about subgraph, do we mention it
>>> as
>>>> a
>>>>>> job
>>>>>>>>> subgraph (each vertex is one or more operators) or execution
>>>> subgraph
>>>>>>>> (each
>>>>>>>>> vertex is a task instance)?
>>>>>>>>>
>>>>>>>>> If it is a job subgraph, then indeed, why not separate it
>> into
>>>>>> multiple
>>>>>>>>> jobs as Caizhi mentioned.
>>>>>>>>> If it is an execution subgraph, then it is difficult to
>> handle
>>>>>>> rescaling
>>>>>>>>> due to inconsistent views of checkpoints between tasks of the
>>>> same
>>>>>>>>> operator.
>>>>>>>>>
>>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an
>>> interesting
>>>>>>>> direction
>>>>>>>>> to think of, and I'd love to hear more from you!
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Yuan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
>>>> tsreaper96@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>> Hi Gyula!
>>>>>>>>>>
>>>>>>>>>> Thanks for raising this discussion. I agree that this will
>> be
>>>> an
>>>>>>>>>> interesting feature but I actually have some doubts about
>> the
>>>>>>>> motivation
>>>>>>>>>> and use case. If there are multiple individual subgraphs in
>>> the
>>>>>> same
>>>>>>>> job,
>>>>>>>>>> why not just distribute them to multiple jobs so that each
>>> job
>>>>>>> contains
>>>>>>>>>> only one individual graph and can now fail without
>> disturbing
>>>> the
>>>>>>>> others?
>>>>>>>>>>
>>>>>>>>>> Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
>>>>>>>>>>
>>>>>>>>>>> Hi all!
>>>>>>>>>>>
>>>>>>>>>>> At the moment checkpointing only works for healthy jobs
>>> with
>>>> all
>>>>>>>>> running
>>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most
>>>> cases
>>>>>> but
>>>>>>>>> there
>>>>>>>>>>> are a few applications where it would make sense to
>>>> checkpoint
>>>>>>>> failing
>>>>>>>>>> jobs
>>>>>>>>>>> as well.
>>>>>>>>>>>
>>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs
>>> that
>>>>>> have a
>>>>>>>>>> failing
>>>>>>>>>>> task cannot be checkpointed without violating the
>>>> exactly-once
>>>>>>>>> semantics.
>>>>>>>>>>> However if the job has multiple independent subgraphs
>> (that
>>>> are
>>>>>> not
>>>>>>>>>>> connected to each other), even if one subgraph is
>> failing,
>>>> the
>>>>>>> other
>>>>>>>>>>> completely running one could be checkpointed. In these
>>> cases
>>>> the
>>>>>>>> tasks
>>>>>>>>> of
>>>>>>>>>>> the failing subgraph could simply inherit the last
>>> successful
>>>>>>>>> checkpoint
>>>>>>>>>>> metadata (before they started failing). This logic would
>>>> produce
>>>>>> a
>>>>>>>>>>> consistent checkpoint.
>>>>>>>>>>>
>>>>>>>>>>> The job as a whole could now make stateful progress even
>> if
>>>> some
>>>>>>>>>> subgraphs
>>>>>>>>>>> are constantly failing. This can be very valuable if for
>>> some
>>>>>>> reason
>>>>>>>>> the
>>>>>>>>>>> job has a larger number of independent subgraphs that are
>>>>>> expected
>>>>>>> to
>>>>>>>>>> fail
>>>>>>>>>>> every once in a while, or if some subgraphs can have
>> longer
>>>>>>> downtimes
>>>>>>>>>> that
>>>>>>>>>>> would now cause the whole job to stall.
>>>>>>>>>>>
>>>>>>>>>>> What do you think?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Gyula
>>>>>>>>>>>


Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Gen Luo <lu...@gmail.com>.
Hi,

@Yuan
Do you mean that there should be no shared state between source subtasks?
Sharing state between checkpoints of a specific subtask should be fine.

Sharing state between subtasks of a task can be an issue, no matter whether
it's a source. That's also what I was afraid of in the previous replies. In
one word, if the behavior of a pipeline region can somehow influence the
state of other pipeline regions, their checkpoints have to be aligned
before rescaling.

On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yu...@gmail.com> wrote:

> Hey Folks,
>
> Thanks for the discussion!
>
> *Motiviation and use cases*
> I think motiviation and use cases are very clear and I do not have doubts
> on this part.
> A typical use case is ETL with two-phase-commit, hundreds of partitions can
> be blocked by a single straggler (a single task's checkpoint abortion can
> affect all, not necessary failure).
>
> *Source offset redistribution*
> As for the known sources & implementation for Flink, I can not find a case
> that does not work, *for now*.
> I need to dig a bit more: how splits are tracked assigned, not successfully
> processed, succesffully processed e.t.c.
> I guess it is a single shared source OPCoordinator. And how this *shared*
> state (between tasks) is preserved?
>
> *Input partition/splits treated completely independent from each other*
> This part I am still not sure, as mentioned if we have shared state of
> source in the above section.
>
> To Thomas:
> > In Yuan's example, is there a reason why CP8 could not be promoted to
> > CP10 by the coordinator for PR2 once it receives the notification that
> > CP10 did not complete? It appears that should be possible since in its
> > effect it should be no different than no data processed between CP8
> >  and CP10?
>
> Not sure what "promoted" means here, but
> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> if no shared state in source, as exactly what you meantinoed,
> "it should be no different than no data processed between CP8 and CP10"
>
> 2. I've noticed that from this question there is a gap between
> "*allow aborted/failed checkpoint in independent sub-graph*" and
> my intention: "*independent sub-graph checkpointing indepently*"
>
> Best
> Yuan
>
>
> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <lu...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm thinking about Yuan's case. Let's assume that the case is running in
> > current Flink:
> > 1. CP8 finishes
> > 2. For some reason, PR2 stops consuming records from the source (but is
> not
> > stuck), and PR1 continues consuming new records.
> > 3. CP9 and CP10 finish
> > 4. PR2 starts to consume quickly to catch up with PR1, and reaches the
> same
> > final status with that in Yuan's case before CP11 starts.
> >
> > I support that in this case, the status of the job can be the same as in
> > Yuan's case, and the snapshot (including source states) taken at CP10
> > should be the same as the composed global snapshot in Yuan's case, which
> is
> > combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> failed
> > checkpointing nor uncommitted consuming have side effects, both of which
> > can break the exactly-once semantics when replaying. So I think there
> > should be no difference between rescaling the combined global snapshot or
> > the globally taken one, i.e. if the input partitions are not independent,
> > we are probably not able to rescale the source state in the current Flink
> > eiter.
> >
> > And @Thomas, I do agree that the operational burden is
> > significantly reduced, while I'm a little afraid that checkpointing the
> > subgraphs individually may increase most of the runtime overhead back
> > again. Maybe we can find a better way to implement this.
> >
> > On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > Thanks for opening this discussion! The proposed enhancement would be
> > > interesting for use cases in our infrastructure as well.
> > >
> > > There are scenarios where it makes sense to have multiple disconnected
> > > subgraphs in a single job because it can significantly reduce the
> > > operational burden as well as the runtime overhead. Since we allow
> > > subgraphs to recover independently, then why not allow them to make
> > > progress independently also, which would imply that checkpointing must
> > > succeed for non affected subgraphs as certain behavior is tied to
> > > checkpoint completion, like Kafka offset commit, file output etc.
> > >
> > > As for source offset redistribution, offset/position needs to be tied
> > > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > > source framework, it would be hard to implement a source with correct
> > > behavior that does not track the position along with the split.
> > >
> > > In Yuan's example, is there a reason why CP8 could not be promoted to
> > > CP10 by the coordinator for PR2 once it receives the notification that
> > > CP10 did not complete? It appears that should be possible since in its
> > > effect it should be no different than no data processed between CP8
> > > and CP10?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org>
> > wrote:
> > > >
> > > > Thanks for the clarification Yuan and Gen,
> > > >
> > > > I agree that the checkpointing of the sources needs to support the
> > > > rescaling case, otherwise it does not work. Is there currently a
> source
> > > > implementation where this wouldn't work? For Kafka it should work
> > because
> > > > we store the offset per assigned partition. For Kinesis it is
> probably
> > > the
> > > > same. For the Filesource we store the set of unread input splits in
> the
> > > > source coordinator and the state of the assigned splits in the
> sources.
> > > > This should probably also work since new splits are only handed out
> to
> > > > running tasks.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Till,
> > > > >
> > > > > > Why rescaling is a problem for pipelined regions/independent
> > > execution
> > > > > subgraphs:
> > > > >
> > > > > Take a simplified example :
> > > > > job graph : source  (2 instances) -> sink (2 instances)
> > > > > execution graph:
> > > > > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > > > > source (2/2)  -> sink (2/2)   [pieplined region 2]
> > > > >
> > > > > Let's assume checkpoints are still triggered globally, meaning
> > > different
> > > > > pipelined regions share the global checkpoint id (PR1 CP1 matches
> > with
> > > PR2
> > > > > CP1).
> > > > >
> > > > > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > > > >
> > > > > Let's say we want to rescale to parallelism 3 due to increased
> input.
> > > > >
> > > > > - Notice that we can not simply rescale based on the latest
> completed
> > > > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> > output
> > > > > externally.
> > > > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> > > how the
> > > > > source's offset redistribution is implemented.
> > > > >    The answer is yes if we treat each input partition as
> independent
> > > from
> > > > > each other, *but I am not sure whether we can make that
> assumption*.
> > > > >
> > > > > If not, the rescaling cannot happen until PR1 and PR2 are aligned
> > with
> > > CPs.
> > > > >
> > > > > Best
> > > > > -Yuan
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <trohrmann@apache.org
> >
> > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > Yuan and Gen could you elaborate why rescaling is a problem if we
> > say
> > > > > that
> > > > > > separate pipelined regions can take checkpoints independently?
> > > > > > Conceptually, I somehow think that a pipelined region that is
> > failed
> > > and
> > > > > > cannot create a new checkpoint is more or less the same as a
> > > pipelined
> > > > > > region that didn't get new input or a very very slow pipelined
> > region
> > > > > which
> > > > > > couldn't read new records since the last checkpoint (assuming
> that
> > > the
> > > > > > checkpoint coordinator can create a global checkpoint by
> combining
> > > > > > individual checkpoints (e.g. taking the last completed checkpoint
> > > from
> > > > > each
> > > > > > pipelined region)). If this comparison is correct, then this
> would
> > > mean
> > > > > > that we have rescaling problems under the latter two cases.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Gyula,
> > > > > > >
> > > > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can
> > > discuss
> > > > > > this
> > > > > > > within two scopes. One is the job subgraph, the other is the
> > > execution
> > > > > > > subgraph, which I suppose is the same as PipelineRegion.
> > > > > > >
> > > > > > > An idea is to individually checkpoint the PipelineRegions, for
> > the
> > > > > > > recovering in a single run.
> > > > > > >
> > > > > > > Flink has now supported PipelineRegion based failover, with a
> > > subset
> > > > > of a
> > > > > > > global checkpoint snapshot. The checkpoint barriers are spread
> > > within a
> > > > > > > PipelineRegion, so the checkpointing of individual
> > PipelineRegions
> > > is
> > > > > > > actually independent. Since in a single run of a job, the
> > > > > PipelineRegions
> > > > > > > are fixed, we can individually checkpoint separated
> > > PipelineRegions,
> > > > > > > despite what status the other PipelineRegions are, and use a
> > > snapshot
> > > > > of
> > > > > > a
> > > > > > > failing region to recover, instead of the subset of a global
> > > snapshot.
> > > > > > This
> > > > > > > can support separated job subgraphs as well, since they will
> also
> > > be
> > > > > > > separated into different PipelineRegions. I think this can
> > fulfill
> > > your
> > > > > > > needs.
> > > > > > >
> > > > > > > In fact the individual snapshots of all PipelineRegions can
> form
> > a
> > > > > global
> > > > > > > snapshot, and the alignment of snapshots of individual regions
> is
> > > not
> > > > > > > necessary. But rescaling this global snapshot can be
> potentially
> > > > > > complex. I
> > > > > > > think it's better to use the individual snapshots in a single
> > run,
> > > and
> > > > > > take
> > > > > > > a global checkpoint/savepoint before restarting the job,
> > rescaling
> > > it
> > > > > or
> > > > > > > not.
> > > > > > >
> > > > > > > A major issue of this plan is that it breaks the checkpoint
> > > mechanism
> > > > > of
> > > > > > > Flink. As far as I know, even in the approximate recovery, the
> > > snapshot
> > > > > > > used to recover a single task is still a part of a global
> > > snapshot. To
> > > > > > > implement the individual checkpointing of PipelineRegions,
> there
> > > may
> > > > > need
> > > > > > > to be a checkpoint coordinator for each PipelineRegion, and a
> new
> > > > > global
> > > > > > > checkpoint coordinator. When the scale goes up, there can be
> many
> > > > > > > individual regions, which can be a big burden to the job
> manager.
> > > The
> > > > > > > meaning of the checkpoint id will also be changed, which can
> > affect
> > > > > many
> > > > > > > aspects. There can be lots of work and risks, and the risks
> still
> > > exist
> > > > > > if
> > > > > > > we only individually checkpoint separated job subgraphs, since
> > the
> > > > > > > mechanism is still broken. If that is what you need, maybe
> > > separating
> > > > > > them
> > > > > > > into different jobs is an easier and better choice, as Caizhi
> and
> > > Yuan
> > > > > > > mentioned.
> > > > > > >
> > > > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
> yuanmei.work@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey Gyula,
> > > > > > > >
> > > > > > > > That's a very interesting idea. The discussion about the
> > > `Individual`
> > > > > > vs
> > > > > > > > `Global` checkpoint was raised before, but the main concern
> was
> > > from
> > > > > > two
> > > > > > > > aspects:
> > > > > > > >
> > > > > > > > - Non-deterministic replaying may lead to an inconsistent
> view
> > of
> > > > > > > > checkpoint
> > > > > > > > - It is not easy to form a clear cut of past and future and
> > > hence no
> > > > > > > clear
> > > > > > > > cut of where the start point of the source should begin to
> > replay
> > > > > from.
> > > > > > > >
> > > > > > > > Starting from independent subgraphs as you proposed may be a
> > good
> > > > > > > starting
> > > > > > > > point. However, when we talk about subgraph, do we mention it
> > as
> > > a
> > > > > job
> > > > > > > > subgraph (each vertex is one or more operators) or execution
> > > subgraph
> > > > > > > (each
> > > > > > > > vertex is a task instance)?
> > > > > > > >
> > > > > > > > If it is a job subgraph, then indeed, why not separate it
> into
> > > > > multiple
> > > > > > > > jobs as Caizhi mentioned.
> > > > > > > > If it is an execution subgraph, then it is difficult to
> handle
> > > > > > rescaling
> > > > > > > > due to inconsistent views of checkpoints between tasks of the
> > > same
> > > > > > > > operator.
> > > > > > > >
> > > > > > > > `Individual/Subgraph Checkpointing` is definitely an
> > interesting
> > > > > > > direction
> > > > > > > > to think of, and I'd love to hear more from you!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Yuan
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> > > tsreaper96@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Gyula!
> > > > > > > > >
> > > > > > > > > Thanks for raising this discussion. I agree that this will
> be
> > > an
> > > > > > > > > interesting feature but I actually have some doubts about
> the
> > > > > > > motivation
> > > > > > > > > and use case. If there are multiple individual subgraphs in
> > the
> > > > > same
> > > > > > > job,
> > > > > > > > > why not just distribute them to multiple jobs so that each
> > job
> > > > > > contains
> > > > > > > > > only one individual graph and can now fail without
> disturbing
> > > the
> > > > > > > others?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > > > > > >
> > > > > > > > > > Hi all!
> > > > > > > > > >
> > > > > > > > > > At the moment checkpointing only works for healthy jobs
> > with
> > > all
> > > > > > > > running
> > > > > > > > > > (or some finished) tasks. This sounds reasonable in most
> > > cases
> > > > > but
> > > > > > > > there
> > > > > > > > > > are a few applications where it would make sense to
> > > checkpoint
> > > > > > > failing
> > > > > > > > > jobs
> > > > > > > > > > as well.
> > > > > > > > > >
> > > > > > > > > > Due to how the checkpointing mechanism works, subgraphs
> > that
> > > > > have a
> > > > > > > > > failing
> > > > > > > > > > task cannot be checkpointed without violating the
> > > exactly-once
> > > > > > > > semantics.
> > > > > > > > > > However if the job has multiple independent subgraphs
> (that
> > > are
> > > > > not
> > > > > > > > > > connected to each other), even if one subgraph is
> failing,
> > > the
> > > > > > other
> > > > > > > > > > completely running one could be checkpointed. In these
> > cases
> > > the
> > > > > > > tasks
> > > > > > > > of
> > > > > > > > > > the failing subgraph could simply inherit the last
> > successful
> > > > > > > > checkpoint
> > > > > > > > > > metadata (before they started failing). This logic would
> > > produce
> > > > > a
> > > > > > > > > > consistent checkpoint.
> > > > > > > > > >
> > > > > > > > > > The job as a whole could now make stateful progress even
> if
> > > some
> > > > > > > > > subgraphs
> > > > > > > > > > are constantly failing. This can be very valuable if for
> > some
> > > > > > reason
> > > > > > > > the
> > > > > > > > > > job has a larger number of independent subgraphs that are
> > > > > expected
> > > > > > to
> > > > > > > > > fail
> > > > > > > > > > every once in a while, or if some subgraphs can have
> longer
> > > > > > downtimes
> > > > > > > > > that
> > > > > > > > > > would now cause the whole job to stall.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Gyula
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Yuan Mei <yu...@gmail.com>.
Hey Folks,

Thanks for the discussion!

*Motiviation and use cases*
I think motiviation and use cases are very clear and I do not have doubts
on this part.
A typical use case is ETL with two-phase-commit, hundreds of partitions can
be blocked by a single straggler (a single task's checkpoint abortion can
affect all, not necessary failure).

*Source offset redistribution*
As for the known sources & implementation for Flink, I can not find a case
that does not work, *for now*.
I need to dig a bit more: how splits are tracked assigned, not successfully
processed, succesffully processed e.t.c.
I guess it is a single shared source OPCoordinator. And how this *shared*
state (between tasks) is preserved?

*Input partition/splits treated completely independent from each other*
This part I am still not sure, as mentioned if we have shared state of
source in the above section.

To Thomas:
> In Yuan's example, is there a reason why CP8 could not be promoted to
> CP10 by the coordinator for PR2 once it receives the notification that
> CP10 did not complete? It appears that should be possible since in its
> effect it should be no different than no data processed between CP8
>  and CP10?

Not sure what "promoted" means here, but
1. I guess it does not matter whether it is CP8 or CP10 any more,
if no shared state in source, as exactly what you meantinoed,
"it should be no different than no data processed between CP8 and CP10"

2. I've noticed that from this question there is a gap between
"*allow aborted/failed checkpoint in independent sub-graph*" and
my intention: "*independent sub-graph checkpointing indepently*"

Best
Yuan


On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <lu...@gmail.com> wrote:

> Hi,
>
> I'm thinking about Yuan's case. Let's assume that the case is running in
> current Flink:
> 1. CP8 finishes
> 2. For some reason, PR2 stops consuming records from the source (but is not
> stuck), and PR1 continues consuming new records.
> 3. CP9 and CP10 finish
> 4. PR2 starts to consume quickly to catch up with PR1, and reaches the same
> final status with that in Yuan's case before CP11 starts.
>
> I support that in this case, the status of the job can be the same as in
> Yuan's case, and the snapshot (including source states) taken at CP10
> should be the same as the composed global snapshot in Yuan's case, which is
> combining CP10 of PR1 and CP8 of PR2. This should be true if neither failed
> checkpointing nor uncommitted consuming have side effects, both of which
> can break the exactly-once semantics when replaying. So I think there
> should be no difference between rescaling the combined global snapshot or
> the globally taken one, i.e. if the input partitions are not independent,
> we are probably not able to rescale the source state in the current Flink
> eiter.
>
> And @Thomas, I do agree that the operational burden is
> significantly reduced, while I'm a little afraid that checkpointing the
> subgraphs individually may increase most of the runtime overhead back
> again. Maybe we can find a better way to implement this.
>
> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:
>
> > Hi,
> >
> > Thanks for opening this discussion! The proposed enhancement would be
> > interesting for use cases in our infrastructure as well.
> >
> > There are scenarios where it makes sense to have multiple disconnected
> > subgraphs in a single job because it can significantly reduce the
> > operational burden as well as the runtime overhead. Since we allow
> > subgraphs to recover independently, then why not allow them to make
> > progress independently also, which would imply that checkpointing must
> > succeed for non affected subgraphs as certain behavior is tied to
> > checkpoint completion, like Kafka offset commit, file output etc.
> >
> > As for source offset redistribution, offset/position needs to be tied
> > to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > source framework, it would be hard to implement a source with correct
> > behavior that does not track the position along with the split.
> >
> > In Yuan's example, is there a reason why CP8 could not be promoted to
> > CP10 by the coordinator for PR2 once it receives the notification that
> > CP10 did not complete? It appears that should be possible since in its
> > effect it should be no different than no data processed between CP8
> > and CP10?
> >
> > Thanks,
> > Thomas
> >
> > On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org>
> wrote:
> > >
> > > Thanks for the clarification Yuan and Gen,
> > >
> > > I agree that the checkpointing of the sources needs to support the
> > > rescaling case, otherwise it does not work. Is there currently a source
> > > implementation where this wouldn't work? For Kafka it should work
> because
> > > we store the offset per assigned partition. For Kinesis it is probably
> > the
> > > same. For the Filesource we store the set of unread input splits in the
> > > source coordinator and the state of the assigned splits in the sources.
> > > This should probably also work since new splits are only handed out to
> > > running tasks.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com>
> wrote:
> > >
> > > > Hey Till,
> > > >
> > > > > Why rescaling is a problem for pipelined regions/independent
> > execution
> > > > subgraphs:
> > > >
> > > > Take a simplified example :
> > > > job graph : source  (2 instances) -> sink (2 instances)
> > > > execution graph:
> > > > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > > > source (2/2)  -> sink (2/2)   [pieplined region 2]
> > > >
> > > > Let's assume checkpoints are still triggered globally, meaning
> > different
> > > > pipelined regions share the global checkpoint id (PR1 CP1 matches
> with
> > PR2
> > > > CP1).
> > > >
> > > > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > > >
> > > > Let's say we want to rescale to parallelism 3 due to increased input.
> > > >
> > > > - Notice that we can not simply rescale based on the latest completed
> > > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> output
> > > > externally.
> > > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> > how the
> > > > source's offset redistribution is implemented.
> > > >    The answer is yes if we treat each input partition as independent
> > from
> > > > each other, *but I am not sure whether we can make that assumption*.
> > > >
> > > > If not, the rescaling cannot happen until PR1 and PR2 are aligned
> with
> > CPs.
> > > >
> > > > Best
> > > > -Yuan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <tr...@apache.org>
> > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Yuan and Gen could you elaborate why rescaling is a problem if we
> say
> > > > that
> > > > > separate pipelined regions can take checkpoints independently?
> > > > > Conceptually, I somehow think that a pipelined region that is
> failed
> > and
> > > > > cannot create a new checkpoint is more or less the same as a
> > pipelined
> > > > > region that didn't get new input or a very very slow pipelined
> region
> > > > which
> > > > > couldn't read new records since the last checkpoint (assuming that
> > the
> > > > > checkpoint coordinator can create a global checkpoint by combining
> > > > > individual checkpoints (e.g. taking the last completed checkpoint
> > from
> > > > each
> > > > > pipelined region)). If this comparison is correct, then this would
> > mean
> > > > > that we have rescaling problems under the latter two cases.
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Gyula,
> > > > > >
> > > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can
> > discuss
> > > > > this
> > > > > > within two scopes. One is the job subgraph, the other is the
> > execution
> > > > > > subgraph, which I suppose is the same as PipelineRegion.
> > > > > >
> > > > > > An idea is to individually checkpoint the PipelineRegions, for
> the
> > > > > > recovering in a single run.
> > > > > >
> > > > > > Flink has now supported PipelineRegion based failover, with a
> > subset
> > > > of a
> > > > > > global checkpoint snapshot. The checkpoint barriers are spread
> > within a
> > > > > > PipelineRegion, so the checkpointing of individual
> PipelineRegions
> > is
> > > > > > actually independent. Since in a single run of a job, the
> > > > PipelineRegions
> > > > > > are fixed, we can individually checkpoint separated
> > PipelineRegions,
> > > > > > despite what status the other PipelineRegions are, and use a
> > snapshot
> > > > of
> > > > > a
> > > > > > failing region to recover, instead of the subset of a global
> > snapshot.
> > > > > This
> > > > > > can support separated job subgraphs as well, since they will also
> > be
> > > > > > separated into different PipelineRegions. I think this can
> fulfill
> > your
> > > > > > needs.
> > > > > >
> > > > > > In fact the individual snapshots of all PipelineRegions can form
> a
> > > > global
> > > > > > snapshot, and the alignment of snapshots of individual regions is
> > not
> > > > > > necessary. But rescaling this global snapshot can be potentially
> > > > > complex. I
> > > > > > think it's better to use the individual snapshots in a single
> run,
> > and
> > > > > take
> > > > > > a global checkpoint/savepoint before restarting the job,
> rescaling
> > it
> > > > or
> > > > > > not.
> > > > > >
> > > > > > A major issue of this plan is that it breaks the checkpoint
> > mechanism
> > > > of
> > > > > > Flink. As far as I know, even in the approximate recovery, the
> > snapshot
> > > > > > used to recover a single task is still a part of a global
> > snapshot. To
> > > > > > implement the individual checkpointing of PipelineRegions, there
> > may
> > > > need
> > > > > > to be a checkpoint coordinator for each PipelineRegion, and a new
> > > > global
> > > > > > checkpoint coordinator. When the scale goes up, there can be many
> > > > > > individual regions, which can be a big burden to the job manager.
> > The
> > > > > > meaning of the checkpoint id will also be changed, which can
> affect
> > > > many
> > > > > > aspects. There can be lots of work and risks, and the risks still
> > exist
> > > > > if
> > > > > > we only individually checkpoint separated job subgraphs, since
> the
> > > > > > mechanism is still broken. If that is what you need, maybe
> > separating
> > > > > them
> > > > > > into different jobs is an easier and better choice, as Caizhi and
> > Yuan
> > > > > > mentioned.
> > > > > >
> > > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yuanmei.work@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hey Gyula,
> > > > > > >
> > > > > > > That's a very interesting idea. The discussion about the
> > `Individual`
> > > > > vs
> > > > > > > `Global` checkpoint was raised before, but the main concern was
> > from
> > > > > two
> > > > > > > aspects:
> > > > > > >
> > > > > > > - Non-deterministic replaying may lead to an inconsistent view
> of
> > > > > > > checkpoint
> > > > > > > - It is not easy to form a clear cut of past and future and
> > hence no
> > > > > > clear
> > > > > > > cut of where the start point of the source should begin to
> replay
> > > > from.
> > > > > > >
> > > > > > > Starting from independent subgraphs as you proposed may be a
> good
> > > > > > starting
> > > > > > > point. However, when we talk about subgraph, do we mention it
> as
> > a
> > > > job
> > > > > > > subgraph (each vertex is one or more operators) or execution
> > subgraph
> > > > > > (each
> > > > > > > vertex is a task instance)?
> > > > > > >
> > > > > > > If it is a job subgraph, then indeed, why not separate it into
> > > > multiple
> > > > > > > jobs as Caizhi mentioned.
> > > > > > > If it is an execution subgraph, then it is difficult to handle
> > > > > rescaling
> > > > > > > due to inconsistent views of checkpoints between tasks of the
> > same
> > > > > > > operator.
> > > > > > >
> > > > > > > `Individual/Subgraph Checkpointing` is definitely an
> interesting
> > > > > > direction
> > > > > > > to think of, and I'd love to hear more from you!
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Yuan
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> > tsreaper96@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Gyula!
> > > > > > > >
> > > > > > > > Thanks for raising this discussion. I agree that this will be
> > an
> > > > > > > > interesting feature but I actually have some doubts about the
> > > > > > motivation
> > > > > > > > and use case. If there are multiple individual subgraphs in
> the
> > > > same
> > > > > > job,
> > > > > > > > why not just distribute them to multiple jobs so that each
> job
> > > > > contains
> > > > > > > > only one individual graph and can now fail without disturbing
> > the
> > > > > > others?
> > > > > > > >
> > > > > > > >
> > > > > > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > > > > >
> > > > > > > > > Hi all!
> > > > > > > > >
> > > > > > > > > At the moment checkpointing only works for healthy jobs
> with
> > all
> > > > > > > running
> > > > > > > > > (or some finished) tasks. This sounds reasonable in most
> > cases
> > > > but
> > > > > > > there
> > > > > > > > > are a few applications where it would make sense to
> > checkpoint
> > > > > > failing
> > > > > > > > jobs
> > > > > > > > > as well.
> > > > > > > > >
> > > > > > > > > Due to how the checkpointing mechanism works, subgraphs
> that
> > > > have a
> > > > > > > > failing
> > > > > > > > > task cannot be checkpointed without violating the
> > exactly-once
> > > > > > > semantics.
> > > > > > > > > However if the job has multiple independent subgraphs (that
> > are
> > > > not
> > > > > > > > > connected to each other), even if one subgraph is failing,
> > the
> > > > > other
> > > > > > > > > completely running one could be checkpointed. In these
> cases
> > the
> > > > > > tasks
> > > > > > > of
> > > > > > > > > the failing subgraph could simply inherit the last
> successful
> > > > > > > checkpoint
> > > > > > > > > metadata (before they started failing). This logic would
> > produce
> > > > a
> > > > > > > > > consistent checkpoint.
> > > > > > > > >
> > > > > > > > > The job as a whole could now make stateful progress even if
> > some
> > > > > > > > subgraphs
> > > > > > > > > are constantly failing. This can be very valuable if for
> some
> > > > > reason
> > > > > > > the
> > > > > > > > > job has a larger number of independent subgraphs that are
> > > > expected
> > > > > to
> > > > > > > > fail
> > > > > > > > > every once in a while, or if some subgraphs can have longer
> > > > > downtimes
> > > > > > > > that
> > > > > > > > > would now cause the whole job to stall.
> > > > > > > > >
> > > > > > > > > What do you think?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Gen Luo <lu...@gmail.com>.
Hi,

I'm thinking about Yuan's case. Let's assume that the case is running in
current Flink:
1. CP8 finishes
2. For some reason, PR2 stops consuming records from the source (but is not
stuck), and PR1 continues consuming new records.
3. CP9 and CP10 finish
4. PR2 starts to consume quickly to catch up with PR1, and reaches the same
final status with that in Yuan's case before CP11 starts.

I support that in this case, the status of the job can be the same as in
Yuan's case, and the snapshot (including source states) taken at CP10
should be the same as the composed global snapshot in Yuan's case, which is
combining CP10 of PR1 and CP8 of PR2. This should be true if neither failed
checkpointing nor uncommitted consuming have side effects, both of which
can break the exactly-once semantics when replaying. So I think there
should be no difference between rescaling the combined global snapshot or
the globally taken one, i.e. if the input partitions are not independent,
we are probably not able to rescale the source state in the current Flink
eiter.

And @Thomas, I do agree that the operational burden is
significantly reduced, while I'm a little afraid that checkpointing the
subgraphs individually may increase most of the runtime overhead back
again. Maybe we can find a better way to implement this.

On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <th...@apache.org> wrote:

> Hi,
>
> Thanks for opening this discussion! The proposed enhancement would be
> interesting for use cases in our infrastructure as well.
>
> There are scenarios where it makes sense to have multiple disconnected
> subgraphs in a single job because it can significantly reduce the
> operational burden as well as the runtime overhead. Since we allow
> subgraphs to recover independently, then why not allow them to make
> progress independently also, which would imply that checkpointing must
> succeed for non affected subgraphs as certain behavior is tied to
> checkpoint completion, like Kafka offset commit, file output etc.
>
> As for source offset redistribution, offset/position needs to be tied
> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> source framework, it would be hard to implement a source with correct
> behavior that does not track the position along with the split.
>
> In Yuan's example, is there a reason why CP8 could not be promoted to
> CP10 by the coordinator for PR2 once it receives the notification that
> CP10 did not complete? It appears that should be possible since in its
> effect it should be no different than no data processed between CP8
> and CP10?
>
> Thanks,
> Thomas
>
> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org> wrote:
> >
> > Thanks for the clarification Yuan and Gen,
> >
> > I agree that the checkpointing of the sources needs to support the
> > rescaling case, otherwise it does not work. Is there currently a source
> > implementation where this wouldn't work? For Kafka it should work because
> > we store the offset per assigned partition. For Kinesis it is probably
> the
> > same. For the Filesource we store the set of unread input splits in the
> > source coordinator and the state of the assigned splits in the sources.
> > This should probably also work since new splits are only handed out to
> > running tasks.
> >
> > Cheers,
> > Till
> >
> > On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com> wrote:
> >
> > > Hey Till,
> > >
> > > > Why rescaling is a problem for pipelined regions/independent
> execution
> > > subgraphs:
> > >
> > > Take a simplified example :
> > > job graph : source  (2 instances) -> sink (2 instances)
> > > execution graph:
> > > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > > source (2/2)  -> sink (2/2)   [pieplined region 2]
> > >
> > > Let's assume checkpoints are still triggered globally, meaning
> different
> > > pipelined regions share the global checkpoint id (PR1 CP1 matches with
> PR2
> > > CP1).
> > >
> > > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > >
> > > Let's say we want to rescale to parallelism 3 due to increased input.
> > >
> > > - Notice that we can not simply rescale based on the latest completed
> > > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> > > externally.
> > > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on
> how the
> > > source's offset redistribution is implemented.
> > >    The answer is yes if we treat each input partition as independent
> from
> > > each other, *but I am not sure whether we can make that assumption*.
> > >
> > > If not, the rescaling cannot happen until PR1 and PR2 are aligned with
> CPs.
> > >
> > > Best
> > > -Yuan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <tr...@apache.org>
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Yuan and Gen could you elaborate why rescaling is a problem if we say
> > > that
> > > > separate pipelined regions can take checkpoints independently?
> > > > Conceptually, I somehow think that a pipelined region that is failed
> and
> > > > cannot create a new checkpoint is more or less the same as a
> pipelined
> > > > region that didn't get new input or a very very slow pipelined region
> > > which
> > > > couldn't read new records since the last checkpoint (assuming that
> the
> > > > checkpoint coordinator can create a global checkpoint by combining
> > > > individual checkpoints (e.g. taking the last completed checkpoint
> from
> > > each
> > > > pipelined region)). If this comparison is correct, then this would
> mean
> > > > that we have rescaling problems under the latter two cases.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com> wrote:
> > > >
> > > > > Hi Gyula,
> > > > >
> > > > > Thanks for sharing the idea. As Yuan mentioned, I think we can
> discuss
> > > > this
> > > > > within two scopes. One is the job subgraph, the other is the
> execution
> > > > > subgraph, which I suppose is the same as PipelineRegion.
> > > > >
> > > > > An idea is to individually checkpoint the PipelineRegions, for the
> > > > > recovering in a single run.
> > > > >
> > > > > Flink has now supported PipelineRegion based failover, with a
> subset
> > > of a
> > > > > global checkpoint snapshot. The checkpoint barriers are spread
> within a
> > > > > PipelineRegion, so the checkpointing of individual PipelineRegions
> is
> > > > > actually independent. Since in a single run of a job, the
> > > PipelineRegions
> > > > > are fixed, we can individually checkpoint separated
> PipelineRegions,
> > > > > despite what status the other PipelineRegions are, and use a
> snapshot
> > > of
> > > > a
> > > > > failing region to recover, instead of the subset of a global
> snapshot.
> > > > This
> > > > > can support separated job subgraphs as well, since they will also
> be
> > > > > separated into different PipelineRegions. I think this can fulfill
> your
> > > > > needs.
> > > > >
> > > > > In fact the individual snapshots of all PipelineRegions can form a
> > > global
> > > > > snapshot, and the alignment of snapshots of individual regions is
> not
> > > > > necessary. But rescaling this global snapshot can be potentially
> > > > complex. I
> > > > > think it's better to use the individual snapshots in a single run,
> and
> > > > take
> > > > > a global checkpoint/savepoint before restarting the job, rescaling
> it
> > > or
> > > > > not.
> > > > >
> > > > > A major issue of this plan is that it breaks the checkpoint
> mechanism
> > > of
> > > > > Flink. As far as I know, even in the approximate recovery, the
> snapshot
> > > > > used to recover a single task is still a part of a global
> snapshot. To
> > > > > implement the individual checkpointing of PipelineRegions, there
> may
> > > need
> > > > > to be a checkpoint coordinator for each PipelineRegion, and a new
> > > global
> > > > > checkpoint coordinator. When the scale goes up, there can be many
> > > > > individual regions, which can be a big burden to the job manager.
> The
> > > > > meaning of the checkpoint id will also be changed, which can affect
> > > many
> > > > > aspects. There can be lots of work and risks, and the risks still
> exist
> > > > if
> > > > > we only individually checkpoint separated job subgraphs, since the
> > > > > mechanism is still broken. If that is what you need, maybe
> separating
> > > > them
> > > > > into different jobs is an easier and better choice, as Caizhi and
> Yuan
> > > > > mentioned.
> > > > >
> > > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hey Gyula,
> > > > > >
> > > > > > That's a very interesting idea. The discussion about the
> `Individual`
> > > > vs
> > > > > > `Global` checkpoint was raised before, but the main concern was
> from
> > > > two
> > > > > > aspects:
> > > > > >
> > > > > > - Non-deterministic replaying may lead to an inconsistent view of
> > > > > > checkpoint
> > > > > > - It is not easy to form a clear cut of past and future and
> hence no
> > > > > clear
> > > > > > cut of where the start point of the source should begin to replay
> > > from.
> > > > > >
> > > > > > Starting from independent subgraphs as you proposed may be a good
> > > > > starting
> > > > > > point. However, when we talk about subgraph, do we mention it as
> a
> > > job
> > > > > > subgraph (each vertex is one or more operators) or execution
> subgraph
> > > > > (each
> > > > > > vertex is a task instance)?
> > > > > >
> > > > > > If it is a job subgraph, then indeed, why not separate it into
> > > multiple
> > > > > > jobs as Caizhi mentioned.
> > > > > > If it is an execution subgraph, then it is difficult to handle
> > > > rescaling
> > > > > > due to inconsistent views of checkpoints between tasks of the
> same
> > > > > > operator.
> > > > > >
> > > > > > `Individual/Subgraph Checkpointing` is definitely an interesting
> > > > > direction
> > > > > > to think of, and I'd love to hear more from you!
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Yuan
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> tsreaper96@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Gyula!
> > > > > > >
> > > > > > > Thanks for raising this discussion. I agree that this will be
> an
> > > > > > > interesting feature but I actually have some doubts about the
> > > > > motivation
> > > > > > > and use case. If there are multiple individual subgraphs in the
> > > same
> > > > > job,
> > > > > > > why not just distribute them to multiple jobs so that each job
> > > > contains
> > > > > > > only one individual graph and can now fail without disturbing
> the
> > > > > others?
> > > > > > >
> > > > > > >
> > > > > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > > > >
> > > > > > > > Hi all!
> > > > > > > >
> > > > > > > > At the moment checkpointing only works for healthy jobs with
> all
> > > > > > running
> > > > > > > > (or some finished) tasks. This sounds reasonable in most
> cases
> > > but
> > > > > > there
> > > > > > > > are a few applications where it would make sense to
> checkpoint
> > > > > failing
> > > > > > > jobs
> > > > > > > > as well.
> > > > > > > >
> > > > > > > > Due to how the checkpointing mechanism works, subgraphs that
> > > have a
> > > > > > > failing
> > > > > > > > task cannot be checkpointed without violating the
> exactly-once
> > > > > > semantics.
> > > > > > > > However if the job has multiple independent subgraphs (that
> are
> > > not
> > > > > > > > connected to each other), even if one subgraph is failing,
> the
> > > > other
> > > > > > > > completely running one could be checkpointed. In these cases
> the
> > > > > tasks
> > > > > > of
> > > > > > > > the failing subgraph could simply inherit the last successful
> > > > > > checkpoint
> > > > > > > > metadata (before they started failing). This logic would
> produce
> > > a
> > > > > > > > consistent checkpoint.
> > > > > > > >
> > > > > > > > The job as a whole could now make stateful progress even if
> some
> > > > > > > subgraphs
> > > > > > > > are constantly failing. This can be very valuable if for some
> > > > reason
> > > > > > the
> > > > > > > > job has a larger number of independent subgraphs that are
> > > expected
> > > > to
> > > > > > > fail
> > > > > > > > every once in a while, or if some subgraphs can have longer
> > > > downtimes
> > > > > > > that
> > > > > > > > would now cause the whole job to stall.
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Gyula
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Thomas Weise <th...@apache.org>.
Hi,

Thanks for opening this discussion! The proposed enhancement would be
interesting for use cases in our infrastructure as well.

There are scenarios where it makes sense to have multiple disconnected
subgraphs in a single job because it can significantly reduce the
operational burden as well as the runtime overhead. Since we allow
subgraphs to recover independently, then why not allow them to make
progress independently also, which would imply that checkpointing must
succeed for non affected subgraphs as certain behavior is tied to
checkpoint completion, like Kafka offset commit, file output etc.

As for source offset redistribution, offset/position needs to be tied
to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
source framework, it would be hard to implement a source with correct
behavior that does not track the position along with the split.

In Yuan's example, is there a reason why CP8 could not be promoted to
CP10 by the coordinator for PR2 once it receives the notification that
CP10 did not complete? It appears that should be possible since in its
effect it should be no different than no data processed between CP8
and CP10?

Thanks,
Thomas

On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <tr...@apache.org> wrote:
>
> Thanks for the clarification Yuan and Gen,
>
> I agree that the checkpointing of the sources needs to support the
> rescaling case, otherwise it does not work. Is there currently a source
> implementation where this wouldn't work? For Kafka it should work because
> we store the offset per assigned partition. For Kinesis it is probably the
> same. For the Filesource we store the set of unread input splits in the
> source coordinator and the state of the assigned splits in the sources.
> This should probably also work since new splits are only handed out to
> running tasks.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com> wrote:
>
> > Hey Till,
> >
> > > Why rescaling is a problem for pipelined regions/independent execution
> > subgraphs:
> >
> > Take a simplified example :
> > job graph : source  (2 instances) -> sink (2 instances)
> > execution graph:
> > source (1/2)  -> sink (1/2)   [pieplined region 1]
> > source (2/2)  -> sink (2/2)   [pieplined region 2]
> >
> > Let's assume checkpoints are still triggered globally, meaning different
> > pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2
> > CP1).
> >
> > Now let's assume PR1 completes CP10 and PR2 completes CP8.
> >
> > Let's say we want to rescale to parallelism 3 due to increased input.
> >
> > - Notice that we can not simply rescale based on the latest completed
> > checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> > externally.
> > - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the
> > source's offset redistribution is implemented.
> >    The answer is yes if we treat each input partition as independent from
> > each other, *but I am not sure whether we can make that assumption*.
> >
> > If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs.
> >
> > Best
> > -Yuan
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <tr...@apache.org> wrote:
> >
> > > Hi everyone,
> > >
> > > Yuan and Gen could you elaborate why rescaling is a problem if we say
> > that
> > > separate pipelined regions can take checkpoints independently?
> > > Conceptually, I somehow think that a pipelined region that is failed and
> > > cannot create a new checkpoint is more or less the same as a pipelined
> > > region that didn't get new input or a very very slow pipelined region
> > which
> > > couldn't read new records since the last checkpoint (assuming that the
> > > checkpoint coordinator can create a global checkpoint by combining
> > > individual checkpoints (e.g. taking the last completed checkpoint from
> > each
> > > pipelined region)). If this comparison is correct, then this would mean
> > > that we have rescaling problems under the latter two cases.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com> wrote:
> > >
> > > > Hi Gyula,
> > > >
> > > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> > > this
> > > > within two scopes. One is the job subgraph, the other is the execution
> > > > subgraph, which I suppose is the same as PipelineRegion.
> > > >
> > > > An idea is to individually checkpoint the PipelineRegions, for the
> > > > recovering in a single run.
> > > >
> > > > Flink has now supported PipelineRegion based failover, with a subset
> > of a
> > > > global checkpoint snapshot. The checkpoint barriers are spread within a
> > > > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > > > actually independent. Since in a single run of a job, the
> > PipelineRegions
> > > > are fixed, we can individually checkpoint separated PipelineRegions,
> > > > despite what status the other PipelineRegions are, and use a snapshot
> > of
> > > a
> > > > failing region to recover, instead of the subset of a global snapshot.
> > > This
> > > > can support separated job subgraphs as well, since they will also be
> > > > separated into different PipelineRegions. I think this can fulfill your
> > > > needs.
> > > >
> > > > In fact the individual snapshots of all PipelineRegions can form a
> > global
> > > > snapshot, and the alignment of snapshots of individual regions is not
> > > > necessary. But rescaling this global snapshot can be potentially
> > > complex. I
> > > > think it's better to use the individual snapshots in a single run, and
> > > take
> > > > a global checkpoint/savepoint before restarting the job, rescaling it
> > or
> > > > not.
> > > >
> > > > A major issue of this plan is that it breaks the checkpoint mechanism
> > of
> > > > Flink. As far as I know, even in the approximate recovery, the snapshot
> > > > used to recover a single task is still a part of a global snapshot. To
> > > > implement the individual checkpointing of PipelineRegions, there may
> > need
> > > > to be a checkpoint coordinator for each PipelineRegion, and a new
> > global
> > > > checkpoint coordinator. When the scale goes up, there can be many
> > > > individual regions, which can be a big burden to the job manager. The
> > > > meaning of the checkpoint id will also be changed, which can affect
> > many
> > > > aspects. There can be lots of work and risks, and the risks still exist
> > > if
> > > > we only individually checkpoint separated job subgraphs, since the
> > > > mechanism is still broken. If that is what you need, maybe separating
> > > them
> > > > into different jobs is an easier and better choice, as Caizhi and Yuan
> > > > mentioned.
> > > >
> > > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Gyula,
> > > > >
> > > > > That's a very interesting idea. The discussion about the `Individual`
> > > vs
> > > > > `Global` checkpoint was raised before, but the main concern was from
> > > two
> > > > > aspects:
> > > > >
> > > > > - Non-deterministic replaying may lead to an inconsistent view of
> > > > > checkpoint
> > > > > - It is not easy to form a clear cut of past and future and hence no
> > > > clear
> > > > > cut of where the start point of the source should begin to replay
> > from.
> > > > >
> > > > > Starting from independent subgraphs as you proposed may be a good
> > > > starting
> > > > > point. However, when we talk about subgraph, do we mention it as a
> > job
> > > > > subgraph (each vertex is one or more operators) or execution subgraph
> > > > (each
> > > > > vertex is a task instance)?
> > > > >
> > > > > If it is a job subgraph, then indeed, why not separate it into
> > multiple
> > > > > jobs as Caizhi mentioned.
> > > > > If it is an execution subgraph, then it is difficult to handle
> > > rescaling
> > > > > due to inconsistent views of checkpoints between tasks of the same
> > > > > operator.
> > > > >
> > > > > `Individual/Subgraph Checkpointing` is definitely an interesting
> > > > direction
> > > > > to think of, and I'd love to hear more from you!
> > > > >
> > > > > Best,
> > > > >
> > > > > Yuan
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Gyula!
> > > > > >
> > > > > > Thanks for raising this discussion. I agree that this will be an
> > > > > > interesting feature but I actually have some doubts about the
> > > > motivation
> > > > > > and use case. If there are multiple individual subgraphs in the
> > same
> > > > job,
> > > > > > why not just distribute them to multiple jobs so that each job
> > > contains
> > > > > > only one individual graph and can now fail without disturbing the
> > > > others?
> > > > > >
> > > > > >
> > > > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > > >
> > > > > > > Hi all!
> > > > > > >
> > > > > > > At the moment checkpointing only works for healthy jobs with all
> > > > > running
> > > > > > > (or some finished) tasks. This sounds reasonable in most cases
> > but
> > > > > there
> > > > > > > are a few applications where it would make sense to checkpoint
> > > > failing
> > > > > > jobs
> > > > > > > as well.
> > > > > > >
> > > > > > > Due to how the checkpointing mechanism works, subgraphs that
> > have a
> > > > > > failing
> > > > > > > task cannot be checkpointed without violating the exactly-once
> > > > > semantics.
> > > > > > > However if the job has multiple independent subgraphs (that are
> > not
> > > > > > > connected to each other), even if one subgraph is failing, the
> > > other
> > > > > > > completely running one could be checkpointed. In these cases the
> > > > tasks
> > > > > of
> > > > > > > the failing subgraph could simply inherit the last successful
> > > > > checkpoint
> > > > > > > metadata (before they started failing). This logic would produce
> > a
> > > > > > > consistent checkpoint.
> > > > > > >
> > > > > > > The job as a whole could now make stateful progress even if some
> > > > > > subgraphs
> > > > > > > are constantly failing. This can be very valuable if for some
> > > reason
> > > > > the
> > > > > > > job has a larger number of independent subgraphs that are
> > expected
> > > to
> > > > > > fail
> > > > > > > every once in a while, or if some subgraphs can have longer
> > > downtimes
> > > > > > that
> > > > > > > would now cause the whole job to stall.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Gyula
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the clarification Yuan and Gen,

I agree that the checkpointing of the sources needs to support the
rescaling case, otherwise it does not work. Is there currently a source
implementation where this wouldn't work? For Kafka it should work because
we store the offset per assigned partition. For Kinesis it is probably the
same. For the Filesource we store the set of unread input splits in the
source coordinator and the state of the assigned splits in the sources.
This should probably also work since new splits are only handed out to
running tasks.

Cheers,
Till

On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yu...@gmail.com> wrote:

> Hey Till,
>
> > Why rescaling is a problem for pipelined regions/independent execution
> subgraphs:
>
> Take a simplified example :
> job graph : source  (2 instances) -> sink (2 instances)
> execution graph:
> source (1/2)  -> sink (1/2)   [pieplined region 1]
> source (2/2)  -> sink (2/2)   [pieplined region 2]
>
> Let's assume checkpoints are still triggered globally, meaning different
> pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2
> CP1).
>
> Now let's assume PR1 completes CP10 and PR2 completes CP8.
>
> Let's say we want to rescale to parallelism 3 due to increased input.
>
> - Notice that we can not simply rescale based on the latest completed
> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
> externally.
> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the
> source's offset redistribution is implemented.
>    The answer is yes if we treat each input partition as independent from
> each other, *but I am not sure whether we can make that assumption*.
>
> If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs.
>
> Best
> -Yuan
>
>
>
>
>
>
>
> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <tr...@apache.org> wrote:
>
> > Hi everyone,
> >
> > Yuan and Gen could you elaborate why rescaling is a problem if we say
> that
> > separate pipelined regions can take checkpoints independently?
> > Conceptually, I somehow think that a pipelined region that is failed and
> > cannot create a new checkpoint is more or less the same as a pipelined
> > region that didn't get new input or a very very slow pipelined region
> which
> > couldn't read new records since the last checkpoint (assuming that the
> > checkpoint coordinator can create a global checkpoint by combining
> > individual checkpoints (e.g. taking the last completed checkpoint from
> each
> > pipelined region)). If this comparison is correct, then this would mean
> > that we have rescaling problems under the latter two cases.
> >
> > Cheers,
> > Till
> >
> > On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com> wrote:
> >
> > > Hi Gyula,
> > >
> > > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> > this
> > > within two scopes. One is the job subgraph, the other is the execution
> > > subgraph, which I suppose is the same as PipelineRegion.
> > >
> > > An idea is to individually checkpoint the PipelineRegions, for the
> > > recovering in a single run.
> > >
> > > Flink has now supported PipelineRegion based failover, with a subset
> of a
> > > global checkpoint snapshot. The checkpoint barriers are spread within a
> > > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > > actually independent. Since in a single run of a job, the
> PipelineRegions
> > > are fixed, we can individually checkpoint separated PipelineRegions,
> > > despite what status the other PipelineRegions are, and use a snapshot
> of
> > a
> > > failing region to recover, instead of the subset of a global snapshot.
> > This
> > > can support separated job subgraphs as well, since they will also be
> > > separated into different PipelineRegions. I think this can fulfill your
> > > needs.
> > >
> > > In fact the individual snapshots of all PipelineRegions can form a
> global
> > > snapshot, and the alignment of snapshots of individual regions is not
> > > necessary. But rescaling this global snapshot can be potentially
> > complex. I
> > > think it's better to use the individual snapshots in a single run, and
> > take
> > > a global checkpoint/savepoint before restarting the job, rescaling it
> or
> > > not.
> > >
> > > A major issue of this plan is that it breaks the checkpoint mechanism
> of
> > > Flink. As far as I know, even in the approximate recovery, the snapshot
> > > used to recover a single task is still a part of a global snapshot. To
> > > implement the individual checkpointing of PipelineRegions, there may
> need
> > > to be a checkpoint coordinator for each PipelineRegion, and a new
> global
> > > checkpoint coordinator. When the scale goes up, there can be many
> > > individual regions, which can be a big burden to the job manager. The
> > > meaning of the checkpoint id will also be changed, which can affect
> many
> > > aspects. There can be lots of work and risks, and the risks still exist
> > if
> > > we only individually checkpoint separated job subgraphs, since the
> > > mechanism is still broken. If that is what you need, maybe separating
> > them
> > > into different jobs is an easier and better choice, as Caizhi and Yuan
> > > mentioned.
> > >
> > > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com>
> wrote:
> > >
> > > > Hey Gyula,
> > > >
> > > > That's a very interesting idea. The discussion about the `Individual`
> > vs
> > > > `Global` checkpoint was raised before, but the main concern was from
> > two
> > > > aspects:
> > > >
> > > > - Non-deterministic replaying may lead to an inconsistent view of
> > > > checkpoint
> > > > - It is not easy to form a clear cut of past and future and hence no
> > > clear
> > > > cut of where the start point of the source should begin to replay
> from.
> > > >
> > > > Starting from independent subgraphs as you proposed may be a good
> > > starting
> > > > point. However, when we talk about subgraph, do we mention it as a
> job
> > > > subgraph (each vertex is one or more operators) or execution subgraph
> > > (each
> > > > vertex is a task instance)?
> > > >
> > > > If it is a job subgraph, then indeed, why not separate it into
> multiple
> > > > jobs as Caizhi mentioned.
> > > > If it is an execution subgraph, then it is difficult to handle
> > rescaling
> > > > due to inconsistent views of checkpoints between tasks of the same
> > > > operator.
> > > >
> > > > `Individual/Subgraph Checkpointing` is definitely an interesting
> > > direction
> > > > to think of, and I'd love to hear more from you!
> > > >
> > > > Best,
> > > >
> > > > Yuan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Gyula!
> > > > >
> > > > > Thanks for raising this discussion. I agree that this will be an
> > > > > interesting feature but I actually have some doubts about the
> > > motivation
> > > > > and use case. If there are multiple individual subgraphs in the
> same
> > > job,
> > > > > why not just distribute them to multiple jobs so that each job
> > contains
> > > > > only one individual graph and can now fail without disturbing the
> > > others?
> > > > >
> > > > >
> > > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > > >
> > > > > > Hi all!
> > > > > >
> > > > > > At the moment checkpointing only works for healthy jobs with all
> > > > running
> > > > > > (or some finished) tasks. This sounds reasonable in most cases
> but
> > > > there
> > > > > > are a few applications where it would make sense to checkpoint
> > > failing
> > > > > jobs
> > > > > > as well.
> > > > > >
> > > > > > Due to how the checkpointing mechanism works, subgraphs that
> have a
> > > > > failing
> > > > > > task cannot be checkpointed without violating the exactly-once
> > > > semantics.
> > > > > > However if the job has multiple independent subgraphs (that are
> not
> > > > > > connected to each other), even if one subgraph is failing, the
> > other
> > > > > > completely running one could be checkpointed. In these cases the
> > > tasks
> > > > of
> > > > > > the failing subgraph could simply inherit the last successful
> > > > checkpoint
> > > > > > metadata (before they started failing). This logic would produce
> a
> > > > > > consistent checkpoint.
> > > > > >
> > > > > > The job as a whole could now make stateful progress even if some
> > > > > subgraphs
> > > > > > are constantly failing. This can be very valuable if for some
> > reason
> > > > the
> > > > > > job has a larger number of independent subgraphs that are
> expected
> > to
> > > > > fail
> > > > > > every once in a while, or if some subgraphs can have longer
> > downtimes
> > > > > that
> > > > > > would now cause the whole job to stall.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Cheers,
> > > > > > Gyula
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Yuan Mei <yu...@gmail.com>.
Hey Till,

> Why rescaling is a problem for pipelined regions/independent execution
subgraphs:

Take a simplified example :
job graph : source  (2 instances) -> sink (2 instances)
execution graph:
source (1/2)  -> sink (1/2)   [pieplined region 1]
source (2/2)  -> sink (2/2)   [pieplined region 2]

Let's assume checkpoints are still triggered globally, meaning different
pipelined regions share the global checkpoint id (PR1 CP1 matches with PR2
CP1).

Now let's assume PR1 completes CP10 and PR2 completes CP8.

Let's say we want to rescale to parallelism 3 due to increased input.

- Notice that we can not simply rescale based on the latest completed
checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) output
externally.
- Can we take CP10 from PR1 and CP8 from PR2? I think it depends on how the
source's offset redistribution is implemented.
   The answer is yes if we treat each input partition as independent from
each other, *but I am not sure whether we can make that assumption*.

If not, the rescaling cannot happen until PR1 and PR2 are aligned with CPs.

Best
-Yuan







On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi everyone,
>
> Yuan and Gen could you elaborate why rescaling is a problem if we say that
> separate pipelined regions can take checkpoints independently?
> Conceptually, I somehow think that a pipelined region that is failed and
> cannot create a new checkpoint is more or less the same as a pipelined
> region that didn't get new input or a very very slow pipelined region which
> couldn't read new records since the last checkpoint (assuming that the
> checkpoint coordinator can create a global checkpoint by combining
> individual checkpoints (e.g. taking the last completed checkpoint from each
> pipelined region)). If this comparison is correct, then this would mean
> that we have rescaling problems under the latter two cases.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com> wrote:
>
> > Hi Gyula,
> >
> > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> this
> > within two scopes. One is the job subgraph, the other is the execution
> > subgraph, which I suppose is the same as PipelineRegion.
> >
> > An idea is to individually checkpoint the PipelineRegions, for the
> > recovering in a single run.
> >
> > Flink has now supported PipelineRegion based failover, with a subset of a
> > global checkpoint snapshot. The checkpoint barriers are spread within a
> > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > actually independent. Since in a single run of a job, the PipelineRegions
> > are fixed, we can individually checkpoint separated PipelineRegions,
> > despite what status the other PipelineRegions are, and use a snapshot of
> a
> > failing region to recover, instead of the subset of a global snapshot.
> This
> > can support separated job subgraphs as well, since they will also be
> > separated into different PipelineRegions. I think this can fulfill your
> > needs.
> >
> > In fact the individual snapshots of all PipelineRegions can form a global
> > snapshot, and the alignment of snapshots of individual regions is not
> > necessary. But rescaling this global snapshot can be potentially
> complex. I
> > think it's better to use the individual snapshots in a single run, and
> take
> > a global checkpoint/savepoint before restarting the job, rescaling it or
> > not.
> >
> > A major issue of this plan is that it breaks the checkpoint mechanism of
> > Flink. As far as I know, even in the approximate recovery, the snapshot
> > used to recover a single task is still a part of a global snapshot. To
> > implement the individual checkpointing of PipelineRegions, there may need
> > to be a checkpoint coordinator for each PipelineRegion, and a new global
> > checkpoint coordinator. When the scale goes up, there can be many
> > individual regions, which can be a big burden to the job manager. The
> > meaning of the checkpoint id will also be changed, which can affect many
> > aspects. There can be lots of work and risks, and the risks still exist
> if
> > we only individually checkpoint separated job subgraphs, since the
> > mechanism is still broken. If that is what you need, maybe separating
> them
> > into different jobs is an easier and better choice, as Caizhi and Yuan
> > mentioned.
> >
> > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com> wrote:
> >
> > > Hey Gyula,
> > >
> > > That's a very interesting idea. The discussion about the `Individual`
> vs
> > > `Global` checkpoint was raised before, but the main concern was from
> two
> > > aspects:
> > >
> > > - Non-deterministic replaying may lead to an inconsistent view of
> > > checkpoint
> > > - It is not easy to form a clear cut of past and future and hence no
> > clear
> > > cut of where the start point of the source should begin to replay from.
> > >
> > > Starting from independent subgraphs as you proposed may be a good
> > starting
> > > point. However, when we talk about subgraph, do we mention it as a job
> > > subgraph (each vertex is one or more operators) or execution subgraph
> > (each
> > > vertex is a task instance)?
> > >
> > > If it is a job subgraph, then indeed, why not separate it into multiple
> > > jobs as Caizhi mentioned.
> > > If it is an execution subgraph, then it is difficult to handle
> rescaling
> > > due to inconsistent views of checkpoints between tasks of the same
> > > operator.
> > >
> > > `Individual/Subgraph Checkpointing` is definitely an interesting
> > direction
> > > to think of, and I'd love to hear more from you!
> > >
> > > Best,
> > >
> > > Yuan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com>
> > wrote:
> > >
> > > > Hi Gyula!
> > > >
> > > > Thanks for raising this discussion. I agree that this will be an
> > > > interesting feature but I actually have some doubts about the
> > motivation
> > > > and use case. If there are multiple individual subgraphs in the same
> > job,
> > > > why not just distribute them to multiple jobs so that each job
> contains
> > > > only one individual graph and can now fail without disturbing the
> > others?
> > > >
> > > >
> > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > >
> > > > > Hi all!
> > > > >
> > > > > At the moment checkpointing only works for healthy jobs with all
> > > running
> > > > > (or some finished) tasks. This sounds reasonable in most cases but
> > > there
> > > > > are a few applications where it would make sense to checkpoint
> > failing
> > > > jobs
> > > > > as well.
> > > > >
> > > > > Due to how the checkpointing mechanism works, subgraphs that have a
> > > > failing
> > > > > task cannot be checkpointed without violating the exactly-once
> > > semantics.
> > > > > However if the job has multiple independent subgraphs (that are not
> > > > > connected to each other), even if one subgraph is failing, the
> other
> > > > > completely running one could be checkpointed. In these cases the
> > tasks
> > > of
> > > > > the failing subgraph could simply inherit the last successful
> > > checkpoint
> > > > > metadata (before they started failing). This logic would produce a
> > > > > consistent checkpoint.
> > > > >
> > > > > The job as a whole could now make stateful progress even if some
> > > > subgraphs
> > > > > are constantly failing. This can be very valuable if for some
> reason
> > > the
> > > > > job has a larger number of independent subgraphs that are expected
> to
> > > > fail
> > > > > every once in a while, or if some subgraphs can have longer
> downtimes
> > > > that
> > > > > would now cause the whole job to stall.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Gen Luo <lu...@gmail.com>.
Hi Till,

I agree that a failing task is much like a very slow or deadlock task to
the checkpointing. The main difference is whether a checkpoint of the
region the task in can be triggered. Triggering a checkpoint on a failing
region makes no sense since the checkpoint should be discarded right away.
But we can still compose a global checkpoint with the new snapshots of
other regions taken when the region is failing, and the former successful
snapshot of this failing region. This global snapshot is still valid and
can be rescaled like a normal one, if the normal ones are possible to
rescale.

As far as I know some snapshotting methods are using or depending on the
ascending checkpoint id. Checkpointing individually probably means to count
the checkpoint id individually. Composing snapshots of different checkpoint
ids may cause errors.

I am also afraid that there might be issues with the shared states, though
I can't figure out a case right now.

On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi everyone,
>
> Yuan and Gen could you elaborate why rescaling is a problem if we say that
> separate pipelined regions can take checkpoints independently?
> Conceptually, I somehow think that a pipelined region that is failed and
> cannot create a new checkpoint is more or less the same as a pipelined
> region that didn't get new input or a very very slow pipelined region which
> couldn't read new records since the last checkpoint (assuming that the
> checkpoint coordinator can create a global checkpoint by combining
> individual checkpoints (e.g. taking the last completed checkpoint from each
> pipelined region)). If this comparison is correct, then this would mean
> that we have rescaling problems under the latter two cases.
>
> Cheers,
> Till
>
> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com> wrote:
>
> > Hi Gyula,
> >
> > Thanks for sharing the idea. As Yuan mentioned, I think we can discuss
> this
> > within two scopes. One is the job subgraph, the other is the execution
> > subgraph, which I suppose is the same as PipelineRegion.
> >
> > An idea is to individually checkpoint the PipelineRegions, for the
> > recovering in a single run.
> >
> > Flink has now supported PipelineRegion based failover, with a subset of a
> > global checkpoint snapshot. The checkpoint barriers are spread within a
> > PipelineRegion, so the checkpointing of individual PipelineRegions is
> > actually independent. Since in a single run of a job, the PipelineRegions
> > are fixed, we can individually checkpoint separated PipelineRegions,
> > despite what status the other PipelineRegions are, and use a snapshot of
> a
> > failing region to recover, instead of the subset of a global snapshot.
> This
> > can support separated job subgraphs as well, since they will also be
> > separated into different PipelineRegions. I think this can fulfill your
> > needs.
> >
> > In fact the individual snapshots of all PipelineRegions can form a global
> > snapshot, and the alignment of snapshots of individual regions is not
> > necessary. But rescaling this global snapshot can be potentially
> complex. I
> > think it's better to use the individual snapshots in a single run, and
> take
> > a global checkpoint/savepoint before restarting the job, rescaling it or
> > not.
> >
> > A major issue of this plan is that it breaks the checkpoint mechanism of
> > Flink. As far as I know, even in the approximate recovery, the snapshot
> > used to recover a single task is still a part of a global snapshot. To
> > implement the individual checkpointing of PipelineRegions, there may need
> > to be a checkpoint coordinator for each PipelineRegion, and a new global
> > checkpoint coordinator. When the scale goes up, there can be many
> > individual regions, which can be a big burden to the job manager. The
> > meaning of the checkpoint id will also be changed, which can affect many
> > aspects. There can be lots of work and risks, and the risks still exist
> if
> > we only individually checkpoint separated job subgraphs, since the
> > mechanism is still broken. If that is what you need, maybe separating
> them
> > into different jobs is an easier and better choice, as Caizhi and Yuan
> > mentioned.
> >
> > On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com> wrote:
> >
> > > Hey Gyula,
> > >
> > > That's a very interesting idea. The discussion about the `Individual`
> vs
> > > `Global` checkpoint was raised before, but the main concern was from
> two
> > > aspects:
> > >
> > > - Non-deterministic replaying may lead to an inconsistent view of
> > > checkpoint
> > > - It is not easy to form a clear cut of past and future and hence no
> > clear
> > > cut of where the start point of the source should begin to replay from.
> > >
> > > Starting from independent subgraphs as you proposed may be a good
> > starting
> > > point. However, when we talk about subgraph, do we mention it as a job
> > > subgraph (each vertex is one or more operators) or execution subgraph
> > (each
> > > vertex is a task instance)?
> > >
> > > If it is a job subgraph, then indeed, why not separate it into multiple
> > > jobs as Caizhi mentioned.
> > > If it is an execution subgraph, then it is difficult to handle
> rescaling
> > > due to inconsistent views of checkpoints between tasks of the same
> > > operator.
> > >
> > > `Individual/Subgraph Checkpointing` is definitely an interesting
> > direction
> > > to think of, and I'd love to hear more from you!
> > >
> > > Best,
> > >
> > > Yuan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com>
> > wrote:
> > >
> > > > Hi Gyula!
> > > >
> > > > Thanks for raising this discussion. I agree that this will be an
> > > > interesting feature but I actually have some doubts about the
> > motivation
> > > > and use case. If there are multiple individual subgraphs in the same
> > job,
> > > > why not just distribute them to multiple jobs so that each job
> contains
> > > > only one individual graph and can now fail without disturbing the
> > others?
> > > >
> > > >
> > > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > > >
> > > > > Hi all!
> > > > >
> > > > > At the moment checkpointing only works for healthy jobs with all
> > > running
> > > > > (or some finished) tasks. This sounds reasonable in most cases but
> > > there
> > > > > are a few applications where it would make sense to checkpoint
> > failing
> > > > jobs
> > > > > as well.
> > > > >
> > > > > Due to how the checkpointing mechanism works, subgraphs that have a
> > > > failing
> > > > > task cannot be checkpointed without violating the exactly-once
> > > semantics.
> > > > > However if the job has multiple independent subgraphs (that are not
> > > > > connected to each other), even if one subgraph is failing, the
> other
> > > > > completely running one could be checkpointed. In these cases the
> > tasks
> > > of
> > > > > the failing subgraph could simply inherit the last successful
> > > checkpoint
> > > > > metadata (before they started failing). This logic would produce a
> > > > > consistent checkpoint.
> > > > >
> > > > > The job as a whole could now make stateful progress even if some
> > > > subgraphs
> > > > > are constantly failing. This can be very valuable if for some
> reason
> > > the
> > > > > job has a larger number of independent subgraphs that are expected
> to
> > > > fail
> > > > > every once in a while, or if some subgraphs can have longer
> downtimes
> > > > that
> > > > > would now cause the whole job to stall.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Cheers,
> > > > > Gyula
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Till Rohrmann <tr...@apache.org>.
Hi everyone,

Yuan and Gen could you elaborate why rescaling is a problem if we say that
separate pipelined regions can take checkpoints independently?
Conceptually, I somehow think that a pipelined region that is failed and
cannot create a new checkpoint is more or less the same as a pipelined
region that didn't get new input or a very very slow pipelined region which
couldn't read new records since the last checkpoint (assuming that the
checkpoint coordinator can create a global checkpoint by combining
individual checkpoints (e.g. taking the last completed checkpoint from each
pipelined region)). If this comparison is correct, then this would mean
that we have rescaling problems under the latter two cases.

Cheers,
Till

On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <lu...@gmail.com> wrote:

> Hi Gyula,
>
> Thanks for sharing the idea. As Yuan mentioned, I think we can discuss this
> within two scopes. One is the job subgraph, the other is the execution
> subgraph, which I suppose is the same as PipelineRegion.
>
> An idea is to individually checkpoint the PipelineRegions, for the
> recovering in a single run.
>
> Flink has now supported PipelineRegion based failover, with a subset of a
> global checkpoint snapshot. The checkpoint barriers are spread within a
> PipelineRegion, so the checkpointing of individual PipelineRegions is
> actually independent. Since in a single run of a job, the PipelineRegions
> are fixed, we can individually checkpoint separated PipelineRegions,
> despite what status the other PipelineRegions are, and use a snapshot of a
> failing region to recover, instead of the subset of a global snapshot. This
> can support separated job subgraphs as well, since they will also be
> separated into different PipelineRegions. I think this can fulfill your
> needs.
>
> In fact the individual snapshots of all PipelineRegions can form a global
> snapshot, and the alignment of snapshots of individual regions is not
> necessary. But rescaling this global snapshot can be potentially complex. I
> think it's better to use the individual snapshots in a single run, and take
> a global checkpoint/savepoint before restarting the job, rescaling it or
> not.
>
> A major issue of this plan is that it breaks the checkpoint mechanism of
> Flink. As far as I know, even in the approximate recovery, the snapshot
> used to recover a single task is still a part of a global snapshot. To
> implement the individual checkpointing of PipelineRegions, there may need
> to be a checkpoint coordinator for each PipelineRegion, and a new global
> checkpoint coordinator. When the scale goes up, there can be many
> individual regions, which can be a big burden to the job manager. The
> meaning of the checkpoint id will also be changed, which can affect many
> aspects. There can be lots of work and risks, and the risks still exist if
> we only individually checkpoint separated job subgraphs, since the
> mechanism is still broken. If that is what you need, maybe separating them
> into different jobs is an easier and better choice, as Caizhi and Yuan
> mentioned.
>
> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com> wrote:
>
> > Hey Gyula,
> >
> > That's a very interesting idea. The discussion about the `Individual` vs
> > `Global` checkpoint was raised before, but the main concern was from two
> > aspects:
> >
> > - Non-deterministic replaying may lead to an inconsistent view of
> > checkpoint
> > - It is not easy to form a clear cut of past and future and hence no
> clear
> > cut of where the start point of the source should begin to replay from.
> >
> > Starting from independent subgraphs as you proposed may be a good
> starting
> > point. However, when we talk about subgraph, do we mention it as a job
> > subgraph (each vertex is one or more operators) or execution subgraph
> (each
> > vertex is a task instance)?
> >
> > If it is a job subgraph, then indeed, why not separate it into multiple
> > jobs as Caizhi mentioned.
> > If it is an execution subgraph, then it is difficult to handle rescaling
> > due to inconsistent views of checkpoints between tasks of the same
> > operator.
> >
> > `Individual/Subgraph Checkpointing` is definitely an interesting
> direction
> > to think of, and I'd love to hear more from you!
> >
> > Best,
> >
> > Yuan
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com>
> wrote:
> >
> > > Hi Gyula!
> > >
> > > Thanks for raising this discussion. I agree that this will be an
> > > interesting feature but I actually have some doubts about the
> motivation
> > > and use case. If there are multiple individual subgraphs in the same
> job,
> > > why not just distribute them to multiple jobs so that each job contains
> > > only one individual graph and can now fail without disturbing the
> others?
> > >
> > >
> > > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> > >
> > > > Hi all!
> > > >
> > > > At the moment checkpointing only works for healthy jobs with all
> > running
> > > > (or some finished) tasks. This sounds reasonable in most cases but
> > there
> > > > are a few applications where it would make sense to checkpoint
> failing
> > > jobs
> > > > as well.
> > > >
> > > > Due to how the checkpointing mechanism works, subgraphs that have a
> > > failing
> > > > task cannot be checkpointed without violating the exactly-once
> > semantics.
> > > > However if the job has multiple independent subgraphs (that are not
> > > > connected to each other), even if one subgraph is failing, the other
> > > > completely running one could be checkpointed. In these cases the
> tasks
> > of
> > > > the failing subgraph could simply inherit the last successful
> > checkpoint
> > > > metadata (before they started failing). This logic would produce a
> > > > consistent checkpoint.
> > > >
> > > > The job as a whole could now make stateful progress even if some
> > > subgraphs
> > > > are constantly failing. This can be very valuable if for some reason
> > the
> > > > job has a larger number of independent subgraphs that are expected to
> > > fail
> > > > every once in a while, or if some subgraphs can have longer downtimes
> > > that
> > > > would now cause the whole job to stall.
> > > >
> > > > What do you think?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Gen Luo <lu...@gmail.com>.
Hi Gyula,

Thanks for sharing the idea. As Yuan mentioned, I think we can discuss this
within two scopes. One is the job subgraph, the other is the execution
subgraph, which I suppose is the same as PipelineRegion.

An idea is to individually checkpoint the PipelineRegions, for the
recovering in a single run.

Flink has now supported PipelineRegion based failover, with a subset of a
global checkpoint snapshot. The checkpoint barriers are spread within a
PipelineRegion, so the checkpointing of individual PipelineRegions is
actually independent. Since in a single run of a job, the PipelineRegions
are fixed, we can individually checkpoint separated PipelineRegions,
despite what status the other PipelineRegions are, and use a snapshot of a
failing region to recover, instead of the subset of a global snapshot. This
can support separated job subgraphs as well, since they will also be
separated into different PipelineRegions. I think this can fulfill your
needs.

In fact the individual snapshots of all PipelineRegions can form a global
snapshot, and the alignment of snapshots of individual regions is not
necessary. But rescaling this global snapshot can be potentially complex. I
think it's better to use the individual snapshots in a single run, and take
a global checkpoint/savepoint before restarting the job, rescaling it or
not.

A major issue of this plan is that it breaks the checkpoint mechanism of
Flink. As far as I know, even in the approximate recovery, the snapshot
used to recover a single task is still a part of a global snapshot. To
implement the individual checkpointing of PipelineRegions, there may need
to be a checkpoint coordinator for each PipelineRegion, and a new global
checkpoint coordinator. When the scale goes up, there can be many
individual regions, which can be a big burden to the job manager. The
meaning of the checkpoint id will also be changed, which can affect many
aspects. There can be lots of work and risks, and the risks still exist if
we only individually checkpoint separated job subgraphs, since the
mechanism is still broken. If that is what you need, maybe separating them
into different jobs is an easier and better choice, as Caizhi and Yuan
mentioned.

On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <yu...@gmail.com> wrote:

> Hey Gyula,
>
> That's a very interesting idea. The discussion about the `Individual` vs
> `Global` checkpoint was raised before, but the main concern was from two
> aspects:
>
> - Non-deterministic replaying may lead to an inconsistent view of
> checkpoint
> - It is not easy to form a clear cut of past and future and hence no clear
> cut of where the start point of the source should begin to replay from.
>
> Starting from independent subgraphs as you proposed may be a good starting
> point. However, when we talk about subgraph, do we mention it as a job
> subgraph (each vertex is one or more operators) or execution subgraph (each
> vertex is a task instance)?
>
> If it is a job subgraph, then indeed, why not separate it into multiple
> jobs as Caizhi mentioned.
> If it is an execution subgraph, then it is difficult to handle rescaling
> due to inconsistent views of checkpoints between tasks of the same
> operator.
>
> `Individual/Subgraph Checkpointing` is definitely an interesting direction
> to think of, and I'd love to hear more from you!
>
> Best,
>
> Yuan
>
>
>
>
>
>
>
> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com> wrote:
>
> > Hi Gyula!
> >
> > Thanks for raising this discussion. I agree that this will be an
> > interesting feature but I actually have some doubts about the motivation
> > and use case. If there are multiple individual subgraphs in the same job,
> > why not just distribute them to multiple jobs so that each job contains
> > only one individual graph and can now fail without disturbing the others?
> >
> >
> > Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
> >
> > > Hi all!
> > >
> > > At the moment checkpointing only works for healthy jobs with all
> running
> > > (or some finished) tasks. This sounds reasonable in most cases but
> there
> > > are a few applications where it would make sense to checkpoint failing
> > jobs
> > > as well.
> > >
> > > Due to how the checkpointing mechanism works, subgraphs that have a
> > failing
> > > task cannot be checkpointed without violating the exactly-once
> semantics.
> > > However if the job has multiple independent subgraphs (that are not
> > > connected to each other), even if one subgraph is failing, the other
> > > completely running one could be checkpointed. In these cases the tasks
> of
> > > the failing subgraph could simply inherit the last successful
> checkpoint
> > > metadata (before they started failing). This logic would produce a
> > > consistent checkpoint.
> > >
> > > The job as a whole could now make stateful progress even if some
> > subgraphs
> > > are constantly failing. This can be very valuable if for some reason
> the
> > > job has a larger number of independent subgraphs that are expected to
> > fail
> > > every once in a while, or if some subgraphs can have longer downtimes
> > that
> > > would now cause the whole job to stall.
> > >
> > > What do you think?
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Yuan Mei <yu...@gmail.com>.
Hey Gyula,

That's a very interesting idea. The discussion about the `Individual` vs
`Global` checkpoint was raised before, but the main concern was from two
aspects:

- Non-deterministic replaying may lead to an inconsistent view of checkpoint
- It is not easy to form a clear cut of past and future and hence no clear
cut of where the start point of the source should begin to replay from.

Starting from independent subgraphs as you proposed may be a good starting
point. However, when we talk about subgraph, do we mention it as a job
subgraph (each vertex is one or more operators) or execution subgraph (each
vertex is a task instance)?

If it is a job subgraph, then indeed, why not separate it into multiple
jobs as Caizhi mentioned.
If it is an execution subgraph, then it is difficult to handle rescaling
due to inconsistent views of checkpoints between tasks of the same operator.

`Individual/Subgraph Checkpointing` is definitely an interesting direction
to think of, and I'd love to hear more from you!

Best,

Yuan







On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi Gyula!
>
> Thanks for raising this discussion. I agree that this will be an
> interesting feature but I actually have some doubts about the motivation
> and use case. If there are multiple individual subgraphs in the same job,
> why not just distribute them to multiple jobs so that each job contains
> only one individual graph and can now fail without disturbing the others?
>
>
> Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:
>
> > Hi all!
> >
> > At the moment checkpointing only works for healthy jobs with all running
> > (or some finished) tasks. This sounds reasonable in most cases but there
> > are a few applications where it would make sense to checkpoint failing
> jobs
> > as well.
> >
> > Due to how the checkpointing mechanism works, subgraphs that have a
> failing
> > task cannot be checkpointed without violating the exactly-once semantics.
> > However if the job has multiple independent subgraphs (that are not
> > connected to each other), even if one subgraph is failing, the other
> > completely running one could be checkpointed. In these cases the tasks of
> > the failing subgraph could simply inherit the last successful checkpoint
> > metadata (before they started failing). This logic would produce a
> > consistent checkpoint.
> >
> > The job as a whole could now make stateful progress even if some
> subgraphs
> > are constantly failing. This can be very valuable if for some reason the
> > job has a larger number of independent subgraphs that are expected to
> fail
> > every once in a while, or if some subgraphs can have longer downtimes
> that
> > would now cause the whole job to stall.
> >
> > What do you think?
> >
> > Cheers,
> > Gyula
> >
>

Re: [DISCUSS] Checkpointing (partially) failing jobs

Posted by Caizhi Weng <ts...@gmail.com>.
Hi Gyula!

Thanks for raising this discussion. I agree that this will be an
interesting feature but I actually have some doubts about the motivation
and use case. If there are multiple individual subgraphs in the same job,
why not just distribute them to multiple jobs so that each job contains
only one individual graph and can now fail without disturbing the others?


Gyula Fóra <gy...@apache.org> 于2022年2月7日周一 05:22写道:

> Hi all!
>
> At the moment checkpointing only works for healthy jobs with all running
> (or some finished) tasks. This sounds reasonable in most cases but there
> are a few applications where it would make sense to checkpoint failing jobs
> as well.
>
> Due to how the checkpointing mechanism works, subgraphs that have a failing
> task cannot be checkpointed without violating the exactly-once semantics.
> However if the job has multiple independent subgraphs (that are not
> connected to each other), even if one subgraph is failing, the other
> completely running one could be checkpointed. In these cases the tasks of
> the failing subgraph could simply inherit the last successful checkpoint
> metadata (before they started failing). This logic would produce a
> consistent checkpoint.
>
> The job as a whole could now make stateful progress even if some subgraphs
> are constantly failing. This can be very valuable if for some reason the
> job has a larger number of independent subgraphs that are expected to fail
> every once in a while, or if some subgraphs can have longer downtimes that
> would now cause the whole job to stall.
>
> What do you think?
>
> Cheers,
> Gyula
>