You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Yu Li <ca...@gmail.com> on 2020/02/26 04:59:30 UTC

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Hi All,

Sorry for being late to the discussion. I've gone through the latest FLIP
document and have below questions/suggestions:

1. Do we support asynchronous checkpointing on the in-flight data?
    * From the doc the answer seems to be yes (state-based storage for the
first version), and if so, there would be additional memory consumption on
network buffer during checkpoint and we should take this into account,
especially in container environment.

2. I suggest we also take local recovery into consideration during
implementation, which could speed up the recovery speed especially when the
amount of in-flight data is huge.

3. About checkpointing policy, are the below understanding correct? Maybe
it helps if we map them more explicitly in FLIP doc, IMHO:
    * For single input channel, there's no difference between
UNALIGNED_WITH_MAX_INFLIGHT_DATA
and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
checkpoint once observe the barrier in the input channel.
    * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
starting checkpoint only when barrier appears in all input channels,
while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint when
barrier appears in any one of the input channels.

4. It seems now we only support pre-defined options, but is it possible to
switch in between dynamically? For example, if we predefine the policy to
ALIGNED, could we supply a command to switch
to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? Or
switch to ALIGNED if we see too much data persisted for
UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
what's preventing us from being more adaptive?

Thanks!

Best Regards,
Yu


On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> I would like to propose a modification to this FLIP.
>
> Based on the feedback that we were receiving after publishing this
> document and during Flink Forward, I was growing more and more anxious
> about one issue here: having to persist all buffered in-flight data at
> once. As the volume of this data might be large (GBs per TaskManager even
> with small clusters and relatively simple jobs), the time to persist all of
> this data at once might be quite substantial.
>
>
>
> To address this issue, I would like to propose that at first we implement
> a variant of unaligned checkpoints, just as written down in FLIP-76, but
> with continuous spilling - all data will be persisted/spilled continuously,
> all the time as they come - not at once when the checkpoint starts. Think
> about this proposal as incremental way of persisting the data.
>
> Pros of continuous spilling:
> + faster checkpointing, as there will be no need to store GBs of data,
> just flush/close.
> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU loads,
> steady records throughput and spilling.
>
> Cons of continuous spilling:
> - need to persist all of the network traffic instead of persisting just
> the in-flight data
>
> Larger volume of persisted data doesn’t matter that much from the
> perspective of the throughput, as if you are unable to spill the data
> faster than to process them, unaligned checkpoints are worse option
> compared to the aligned checkpoints [1]. If checkpoints are frequent it
> also doesn’t matter [2]. The true downside is if checkpoints are infrequent
> and you have to for example pay $ for the extra storage or extra network
> traffic to the storage.
>
> On the other hand, continuous spilling (persistent communication
> channels?) might have an added benefit of enabling us localised failures -
> failure of one node will not necessarily bring down the whole cluster.
>
>
>
> As I mentioned, I’m proposing to just start with the continuous spilling.
> It might be more costly in some scenarios, but it will offer the most
> stable and predictable performance with the lowest checkpoint latency. It’s
> not perfect, it won’t solve all of the use cases, but frankly all of the
> other options have their own blind spots, and continuous spilling should at
> least fully solve relatively low throughput use cases. We can later build
> on top of that solution, expanding it with the following features:
>
> 1. Do not spill continuously if there is no backpressure. For example
> provide a timeout: start spilling pre-emptively/continuously if some buffer
> was not processed within X seconds.
> 2. Start spilling only once the checkpoint starts (this is the exact
> proposal from the current FLIP-76).
> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> but in the future we are considering other options, for example Apache
> Bookeeper.
>
> What do you think?
>
> Piotrek
>
>
>
> [1] I’m assuming that the spilling throughput per node can go up to
> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> in-flight data will take 3.3 times longer than waiting for the alignment.
> On the other hand if data processing rate is 10MB/s, overhead of continuous
> spilling is relatively low.
> [2] With checkpoints every one minute, with data processing throughput
> 30MB/s per node, we would have to persist 1.8GB of data per node between
> the checkpoints, which is similar order of magnitude as buffered in-flight
> data under the back-pressure. With higher throughput, unaligned checkpoints
> are not helping ([1]). With lower throughput, both the original proposal
> and continuous spilling would have to effectively persist all of the data
> anyway.
>
> > On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
> >
> > Hi Arvid
> >
> > +1 for this future which has been hoped for a long time. End-to-end
> exactly once job could benefit from quicker checkpoint completion.
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Yun Gao <yu...@aliyun.com.INVALID>
> > Sent: Thursday, October 10, 2019 18:39
> > To: dev <de...@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> >    Hi Arvid,
> >
> >            Very thanks for bring up the discussion! From our side unable
> to finish the checkpoint is commonly met for online jobs, therefore +1 from
> my side to implement this.
> >           A tiny issue of the FLIP is that the Discussion Thread URL
> attached seems to be not right.
> >
> >
> >     Best,
> >     Yun
> >
> >
> > ------------------------------------------------------------------
> > From:Arvid Heise <ar...@ververica.com>
> > Send Time:2019 Sep. 30 (Mon.) 20:31
> > To:dev <de...@flink.apache.org>
> > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi Devs,
> >
> > I would like to start the formal discussion about FLIP-76 [1], which
> > improves the checkpoint latency in systems under backpressure, where a
> > checkpoint can take hours to complete in the worst case. I recommend the
> > thread "checkpointing under backpressure" [2] to get a good idea why
> users
> > are not satisfied with the current behavior. The key points:
> >
> >   - Since the checkpoint barrier flows much slower through the
> >   back-pressured channels, the other channels and their upstream
> operators
> >   are effectively blocked during checkpointing.
> >   - The checkpoint barrier takes a long time to reach the sinks causing
> >   long checkpointing times. A longer checkpointing time in turn means
> that
> >   the checkpoint will be fairly outdated once done. Since a heavily
> utilized
> >   pipeline is inherently more fragile, we may run into a vicious cycle of
> >   late checkpoints, crash, recovery to a rather outdated checkpoint, more
> >   back pressure, and even later checkpoints, which would result in
> little to
> >   no progress in the application.
> >
> > The FLIP proposes "unaligned checkpoints" which improves the current
> state,
> > such that
> >
> >   - Upstream processes can continue to produce data, even if some
> operator
> >   still waits on a checkpoint barrier on a specific input channel.
> >   - Checkpointing times are heavily reduced across the execution graph,
> >   even for operators with a single input channel.
> >   - End-users will see more progress even in unstable environments as
> more
> >   up-to-date checkpoints will avoid too many recomputations.
> >   - Facilitate faster rescaling.
> >
> > The key idea is to allow checkpoint barriers to be forwarded to
> downstream
> > tasks before the synchronous part of the checkpointing has been conducted
> > (see Fig. 1). To that end, we need to store in-flight data as part of the
> > checkpoint as described in greater details in this FLIP.
> >
> > Although the basic idea was already sketched in [2], we would like get
> > broader feedback in this dedicated mail thread.
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
> >
>
>

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Posted by Arvid Heise <ar...@ververica.com>.
Dear devs,

we conducted some POCs and updated the FLIP accordingly [1].

Key changes:
- POC showed that it is viable to spill only on checkpoint (in contrast to
spilling continuously to avoid overload of external systems)
- Greatly revised/refined recovery and rescaling
- Sketched the required components for persisting/recovery
- Refined migration plan

Since this is the second iteration with no big changes and promising POCs,
we would like to move to voting rather quickly unless we receive concerns
until tomorrow.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

On Fri, Feb 28, 2020 at 3:30 PM Yu Li <ca...@gmail.com> wrote:

> Thanks for the further feedback Zhijiang and Piotr! I think this is a great
> feature and will watch the progress. Please also feel free to involve me in
> discussions/reviews on state-related part. Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 27 Feb 2020 at 23:24, Piotr Nowojski <pi...@ververica.com> wrote:
>
> > Hi Yu,
> >
> > Re 4.
> >
> > Dynamic switching between unaligned and aligned checkpoints based on some
> > kind of thresholds (timeout, or checkpoint size) is definitely one of the
> > very first improvement that we want to tackle after implementing the MVP.
> > Depending on the time constraints, dynamic switching can make to 1.11 or
> > not. It’s hard to tell for me at this point of time.
> >
> > Piotrek
> >
> > > On 26 Feb 2020, at 15:59, Zhijiang <wangzhijiang999@aliyun.com
> .INVALID>
> > wrote:
> > >
> > > Thanks for the further explanations, Yu!
> > >
> > > 1. The inflight buffer spilling process is indeed handled
> > asynchronously. While the buffer is not finished spilling, it would not
> be
> > recycled to reuse again.
> > > Your understanding is right. I guess I misunderstood your previous
> > concern of additional memory consumption from the perspective of buffer
> > usage.
> > > My point of no additional memory consumption is from the perspective of
> > total network memory size which would not be increased as a result.
> > >
> > > 2. We treat the inflight buffers as input&output states which are
> > equivalent with existing operator states, and try to make use of all the
> > existing mechanisms for
> > > state handle and assignment during recovery. So i guess for the local
> > recovery it should be the similar case. I would think through whether it
> > has some special
> > > work to do around with local recovery, and then clarify it in FLIP
> after
> > we reach an agreement internally. BTW, this FLIP has not finalized yet.
> > >
> > > 3. Yes, the previous proposal is for measuring how many inflight
> buffers
> > to be spilled which refers to the data size if really taking this way. I
> > think the proposed option
> > > in FLIP are the initial thoughts for various of possibilities. Which
> way
> > we decide to take for the first version, I guess we need to further
> > finalize before voting.
> > >
> > > 4. I think there probably exists the requirements or scenarios from
> > users as you mentioned. Actually we have not finalized the way of
> switching
> > to unaligned checkpoint yet.
> > > Anyway we could provide an option for users to try out this feature at
> > the beginning, although it might be not the most ideal one. Another input
> > is that we know the motivation
> > > of unaligned checkpoint is from the scenarios of backpressure, but it
> > might also performs well in the case of non backpressure, even shorten
> the
> > checkpoint duration without
> > > obvious performance regression in our previous POC testing. So the
> > backpressure might not be the only factor to switch to the unaligned way
> in
> > practice I guess. Anyway your
> > > inputs are valuable for us to make the final decision.
> > >
> > > Best,
> > > Zhijiang
> > >
> > >
> > >
> > >
> > > ------------------------------------------------------------------
> > > From:Yu Li <ca...@gmail.com>
> > > Send Time:2020 Feb. 26 (Wed.) 15:59
> > > To:dev <de...@flink.apache.org>; Zhijiang <wa...@aliyun.com>
> > > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> > >
> > > Hi Zhijiang,
> > >
> > > Thanks for the quick reply!
> > >
> > > For the 1st question, please allow me to confirm, that when doing
> > asynchronous checkpointing, disk spilling should happen in background in
> > parallel with receiving/sending new data, or else it would become
> > synchronous, right? Based on such assumption, some copy-on-write like
> > mechanism would be necessary to make sure the new updates won't modify
> the
> > to-be-checkpointed data, and this is where the additional memory
> > consumption comes from.
> > >
> > > About point #2, I suggest we write it down in the FLIP document about
> > local recovery support (if reach a consensus here), to make sure it won't
> > be neglected in later implementation (I believe there're still some work
> to
> > do following existing local recovery mechanism). What do you think?
> > >
> > > For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would
> > set some kind of threshold about "how much in-flight data to checkpoint"?
> > If so, could you further clarify the measurement (data size? record
> number?
> > others?) since there seems to be no description in the current FLIP doc?
> > This is somewhat different from my understanding after reading the
> FLIP...
> > >
> > > Regarding question #4, I have no doubt that the new unaligned
> checkpoint
> > mechanism could make fast checkpoint possible, at the cost of more
> memory,
> > network bandwidth and disk space consumption. However, (correct me if I'm
> > wrong) for users who are satisfied with the existing aligned checkpoint
> > interval, taking the constant cost to prevent delayed checkpoint during
> > back pressure - a relatively low frequency event - may not be that
> > pragmatic.
> > >
> > > Best Regards,
> > > Yu
> > >
> > > On Wed, 26 Feb 2020 at 15:07, Zhijiang <wangzhijiang999@aliyun.com
> .invalid>
> > wrote:
> > > Hi Yu,
> > >
> > > Thanks for concerning of this FLIP and sharing your thoughts! Let me
> try
> > to answer some below questions.
> > >
> > > 1. Yes, the asynchronous checkpointing should be part of whole process
> > and be supported naturally. As for the network memory concern,
> > > the inflight-buffers would be spilled into persistent storage while
> > triggering checkpoint, and are recycled to receive/send data after finish
> > spilling.
> > > We still reuse the current network memory setting, so the maximum
> > inflight-buffers would not exceed that amount, and there would not have
> > >  additional memory consumption.
> > >
> > > 2. Yes, we would try to reuse the existing checkpoint recovery
> mechanism
> > for simple implementation.
> > >
> > > 3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and
> > UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of
> > triggering checkpoint
> > > at proper time, the tradeoff between checkpoint duration and spilling
> > inflight data, etc. I guess it still makes sense for the single input
> > channel.
> > >  Assuming there were already accumulated 100 unconsumed buffers in one
> > remote input channel when the barrier arrives from the network, then we
> can
> > > decide whether to trigger checkpoint immediately based on
> > UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on
> > UNALIGNED_WITH_MAX_INFLIGHT_DATA
> > > if 100 is not reaching its max threshold.
> > >
> > > 4. I remembered that we ever discussed the options internally before. I
> > agree with that the adaptive way might seem more flexible, but also mean
> > more complicated
> > >  in design and implementation. As the first step of unaligned
> > checkpoint, it seems more make sense to take an easy way for only
> > concentrating on the function and
> > >  practical effect. After getting some feedbacks to convince us, I guess
> > the adaptive way might be probably an option to consider if really
> > necessary in future.
> > >
> > > Best,
> > > Zhijiang
> > >
> > >
> > > ------------------------------------------------------------------
> > > From:Yu Li <ca...@gmail.com>
> > > Send Time:2020 Feb. 26 (Wed.) 12:59
> > > To:dev <de...@flink.apache.org>
> > > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> > >
> > > Hi All,
> > >
> > > Sorry for being late to the discussion. I've gone through the latest
> FLIP
> > > document and have below questions/suggestions:
> > >
> > > 1. Do we support asynchronous checkpointing on the in-flight data?
> > >     * From the doc the answer seems to be yes (state-based storage for
> > the
> > > first version), and if so, there would be additional memory consumption
> > on
> > > network buffer during checkpoint and we should take this into account,
> > > especially in container environment.
> > >
> > > 2. I suggest we also take local recovery into consideration during
> > > implementation, which could speed up the recovery speed especially when
> > the
> > > amount of in-flight data is huge.
> > >
> > > 3. About checkpointing policy, are the below understanding correct?
> Maybe
> > > it helps if we map them more explicitly in FLIP doc, IMHO:
> > >     * For single input channel, there's no difference between
> > > UNALIGNED_WITH_MAX_INFLIGHT_DATA
> > > and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
> > > checkpoint once observe the barrier in the input channel.
> > >     * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA
> means
> > > starting checkpoint only when barrier appears in all input channels,
> > > while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint
> > when
> > > barrier appears in any one of the input channels.
> > >
> > > 4. It seems now we only support pre-defined options, but is it possible
> > to
> > > switch in between dynamically? For example, if we predefine the policy
> to
> > > ALIGNED, could we supply a command to switch
> > > to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed?
> > Or
> > > switch to ALIGNED if we see too much data persisted for
> > > UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> > > what's preventing us from being more adaptive?
> > >
> > > Thanks!
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com>
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I would like to propose a modification to this FLIP.
> > >>
> > >> Based on the feedback that we were receiving after publishing this
> > >> document and during Flink Forward, I was growing more and more anxious
> > >> about one issue here: having to persist all buffered in-flight data at
> > >> once. As the volume of this data might be large (GBs per TaskManager
> > even
> > >> with small clusters and relatively simple jobs), the time to persist
> > all of
> > >> this data at once might be quite substantial.
> > >>
> > >>
> > >>
> > >> To address this issue, I would like to propose that at first we
> > implement
> > >> a variant of unaligned checkpoints, just as written down in FLIP-76,
> but
> > >> with continuous spilling - all data will be persisted/spilled
> > continuously,
> > >> all the time as they come - not at once when the checkpoint starts.
> > Think
> > >> about this proposal as incremental way of persisting the data.
> > >>
> > >> Pros of continuous spilling:
> > >> + faster checkpointing, as there will be no need to store GBs of data,
> > >> just flush/close.
> > >> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> > loads,
> > >> steady records throughput and spilling.
> > >>
> > >> Cons of continuous spilling:
> > >> - need to persist all of the network traffic instead of persisting
> just
> > >> the in-flight data
> > >>
> > >> Larger volume of persisted data doesn’t matter that much from the
> > >> perspective of the throughput, as if you are unable to spill the data
> > >> faster than to process them, unaligned checkpoints are worse option
> > >> compared to the aligned checkpoints [1]. If checkpoints are frequent
> it
> > >> also doesn’t matter [2]. The true downside is if checkpoints are
> > infrequent
> > >> and you have to for example pay $ for the extra storage or extra
> network
> > >> traffic to the storage.
> > >>
> > >> On the other hand, continuous spilling (persistent communication
> > >> channels?) might have an added benefit of enabling us localised
> > failures -
> > >> failure of one node will not necessarily bring down the whole cluster.
> > >>
> > >>
> > >>
> > >> As I mentioned, I’m proposing to just start with the continuous
> > spilling.
> > >> It might be more costly in some scenarios, but it will offer the most
> > >> stable and predictable performance with the lowest checkpoint latency.
> > It’s
> > >> not perfect, it won’t solve all of the use cases, but frankly all of
> the
> > >> other options have their own blind spots, and continuous spilling
> > should at
> > >> least fully solve relatively low throughput use cases. We can later
> > build
> > >> on top of that solution, expanding it with the following features:
> > >>
> > >> 1. Do not spill continuously if there is no backpressure. For example
> > >> provide a timeout: start spilling pre-emptively/continuously if some
> > buffer
> > >> was not processed within X seconds.
> > >> 2. Start spilling only once the checkpoint starts (this is the exact
> > >> proposal from the current FLIP-76).
> > >> 3. Initially we want to spill to a Flink’s FileSystem (for example
> S3),
> > >> but in the future we are considering other options, for example Apache
> > >> Bookeeper.
> > >>
> > >> What do you think?
> > >>
> > >> Piotrek
> > >>
> > >>
> > >>
> > >> [1] I’m assuming that the spilling throughput per node can go up to
> > >> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> > >> in-flight data will take 3.3 times longer than waiting for the
> > alignment.
> > >> On the other hand if data processing rate is 10MB/s, overhead of
> > continuous
> > >> spilling is relatively low.
> > >> [2] With checkpoints every one minute, with data processing throughput
> > >> 30MB/s per node, we would have to persist 1.8GB of data per node
> between
> > >> the checkpoints, which is similar order of magnitude as buffered
> > in-flight
> > >> data under the back-pressure. With higher throughput, unaligned
> > checkpoints
> > >> are not helping ([1]). With lower throughput, both the original
> proposal
> > >> and continuous spilling would have to effectively persist all of the
> > data
> > >> anyway.
> > >>
> > >>> On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
> > >>>
> > >>> Hi Arvid
> > >>>
> > >>> +1 for this future which has been hoped for a long time. End-to-end
> > >> exactly once job could benefit from quicker checkpoint completion.
> > >>>
> > >>>
> > >>> Best
> > >>> Yun Tang
> > >>> ________________________________
> > >>> From: Yun Gao <yu...@aliyun.com.INVALID>
> > >>> Sent: Thursday, October 10, 2019 18:39
> > >>> To: dev <de...@flink.apache.org>
> > >>> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> > >>>
> > >>>   Hi Arvid,
> > >>>
> > >>>           Very thanks for bring up the discussion! From our side
> unable
> > >> to finish the checkpoint is commonly met for online jobs, therefore +1
> > from
> > >> my side to implement this.
> > >>>          A tiny issue of the FLIP is that the Discussion Thread URL
> > >> attached seems to be not right.
> > >>>
> > >>>
> > >>>    Best,
> > >>>    Yun
> > >>>
> > >>>
> > >>> ------------------------------------------------------------------
> > >>> From:Arvid Heise <ar...@ververica.com>
> > >>> Send Time:2019 Sep. 30 (Mon.) 20:31
> > >>> To:dev <de...@flink.apache.org>
> > >>> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> > >>>
> > >>> Hi Devs,
> > >>>
> > >>> I would like to start the formal discussion about FLIP-76 [1], which
> > >>> improves the checkpoint latency in systems under backpressure, where
> a
> > >>> checkpoint can take hours to complete in the worst case. I recommend
> > the
> > >>> thread "checkpointing under backpressure" [2] to get a good idea why
> > >> users
> > >>> are not satisfied with the current behavior. The key points:
> > >>>
> > >>>  - Since the checkpoint barrier flows much slower through the
> > >>>  back-pressured channels, the other channels and their upstream
> > >> operators
> > >>>  are effectively blocked during checkpointing.
> > >>>  - The checkpoint barrier takes a long time to reach the sinks
> causing
> > >>>  long checkpointing times. A longer checkpointing time in turn means
> > >> that
> > >>>  the checkpoint will be fairly outdated once done. Since a heavily
> > >> utilized
> > >>>  pipeline is inherently more fragile, we may run into a vicious cycle
> > of
> > >>>  late checkpoints, crash, recovery to a rather outdated checkpoint,
> > more
> > >>>  back pressure, and even later checkpoints, which would result in
> > >> little to
> > >>>  no progress in the application.
> > >>>
> > >>> The FLIP proposes "unaligned checkpoints" which improves the current
> > >> state,
> > >>> such that
> > >>>
> > >>>  - Upstream processes can continue to produce data, even if some
> > >> operator
> > >>>  still waits on a checkpoint barrier on a specific input channel.
> > >>>  - Checkpointing times are heavily reduced across the execution
> graph,
> > >>>  even for operators with a single input channel.
> > >>>  - End-users will see more progress even in unstable environments as
> > >> more
> > >>>  up-to-date checkpoints will avoid too many recomputations.
> > >>>  - Facilitate faster rescaling.
> > >>>
> > >>> The key idea is to allow checkpoint barriers to be forwarded to
> > >> downstream
> > >>> tasks before the synchronous part of the checkpointing has been
> > conducted
> > >>> (see Fig. 1). To that end, we need to store in-flight data as part of
> > the
> > >>> checkpoint as described in greater details in this FLIP.
> > >>>
> > >>> Although the basic idea was already sketched in [2], we would like
> get
> > >>> broader feedback in this dedicated mail thread.
> > >>>
> > >>> Best,
> > >>>
> > >>> Arvid
> > >>>
> > >>> [1]
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > >>> [2]
> > >>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
> > >>>
> > >>
> > >>
> > >
> > >
> >
> >
>

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Posted by Yu Li <ca...@gmail.com>.
Thanks for the further feedback Zhijiang and Piotr! I think this is a great
feature and will watch the progress. Please also feel free to involve me in
discussions/reviews on state-related part. Thanks.

Best Regards,
Yu


On Thu, 27 Feb 2020 at 23:24, Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Yu,
>
> Re 4.
>
> Dynamic switching between unaligned and aligned checkpoints based on some
> kind of thresholds (timeout, or checkpoint size) is definitely one of the
> very first improvement that we want to tackle after implementing the MVP.
> Depending on the time constraints, dynamic switching can make to 1.11 or
> not. It’s hard to tell for me at this point of time.
>
> Piotrek
>
> > On 26 Feb 2020, at 15:59, Zhijiang <wa...@aliyun.com.INVALID>
> wrote:
> >
> > Thanks for the further explanations, Yu!
> >
> > 1. The inflight buffer spilling process is indeed handled
> asynchronously. While the buffer is not finished spilling, it would not be
> recycled to reuse again.
> > Your understanding is right. I guess I misunderstood your previous
> concern of additional memory consumption from the perspective of buffer
> usage.
> > My point of no additional memory consumption is from the perspective of
> total network memory size which would not be increased as a result.
> >
> > 2. We treat the inflight buffers as input&output states which are
> equivalent with existing operator states, and try to make use of all the
> existing mechanisms for
> > state handle and assignment during recovery. So i guess for the local
> recovery it should be the similar case. I would think through whether it
> has some special
> > work to do around with local recovery, and then clarify it in FLIP after
> we reach an agreement internally. BTW, this FLIP has not finalized yet.
> >
> > 3. Yes, the previous proposal is for measuring how many inflight buffers
> to be spilled which refers to the data size if really taking this way. I
> think the proposed option
> > in FLIP are the initial thoughts for various of possibilities. Which way
> we decide to take for the first version, I guess we need to further
> finalize before voting.
> >
> > 4. I think there probably exists the requirements or scenarios from
> users as you mentioned. Actually we have not finalized the way of switching
> to unaligned checkpoint yet.
> > Anyway we could provide an option for users to try out this feature at
> the beginning, although it might be not the most ideal one. Another input
> is that we know the motivation
> > of unaligned checkpoint is from the scenarios of backpressure, but it
> might also performs well in the case of non backpressure, even shorten the
> checkpoint duration without
> > obvious performance regression in our previous POC testing. So the
> backpressure might not be the only factor to switch to the unaligned way in
> practice I guess. Anyway your
> > inputs are valuable for us to make the final decision.
> >
> > Best,
> > Zhijiang
> >
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Yu Li <ca...@gmail.com>
> > Send Time:2020 Feb. 26 (Wed.) 15:59
> > To:dev <de...@flink.apache.org>; Zhijiang <wa...@aliyun.com>
> > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi Zhijiang,
> >
> > Thanks for the quick reply!
> >
> > For the 1st question, please allow me to confirm, that when doing
> asynchronous checkpointing, disk spilling should happen in background in
> parallel with receiving/sending new data, or else it would become
> synchronous, right? Based on such assumption, some copy-on-write like
> mechanism would be necessary to make sure the new updates won't modify the
> to-be-checkpointed data, and this is where the additional memory
> consumption comes from.
> >
> > About point #2, I suggest we write it down in the FLIP document about
> local recovery support (if reach a consensus here), to make sure it won't
> be neglected in later implementation (I believe there're still some work to
> do following existing local recovery mechanism). What do you think?
> >
> > For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would
> set some kind of threshold about "how much in-flight data to checkpoint"?
> If so, could you further clarify the measurement (data size? record number?
> others?) since there seems to be no description in the current FLIP doc?
> This is somewhat different from my understanding after reading the FLIP...
> >
> > Regarding question #4, I have no doubt that the new unaligned checkpoint
> mechanism could make fast checkpoint possible, at the cost of more memory,
> network bandwidth and disk space consumption. However, (correct me if I'm
> wrong) for users who are satisfied with the existing aligned checkpoint
> interval, taking the constant cost to prevent delayed checkpoint during
> back pressure - a relatively low frequency event - may not be that
> pragmatic.
> >
> > Best Regards,
> > Yu
> >
> > On Wed, 26 Feb 2020 at 15:07, Zhijiang <wa...@aliyun.com.invalid>
> wrote:
> > Hi Yu,
> >
> > Thanks for concerning of this FLIP and sharing your thoughts! Let me try
> to answer some below questions.
> >
> > 1. Yes, the asynchronous checkpointing should be part of whole process
> and be supported naturally. As for the network memory concern,
> > the inflight-buffers would be spilled into persistent storage while
> triggering checkpoint, and are recycled to receive/send data after finish
> spilling.
> > We still reuse the current network memory setting, so the maximum
> inflight-buffers would not exceed that amount, and there would not have
> >  additional memory consumption.
> >
> > 2. Yes, we would try to reuse the existing checkpoint recovery mechanism
> for simple implementation.
> >
> > 3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and
> UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of
> triggering checkpoint
> > at proper time, the tradeoff between checkpoint duration and spilling
> inflight data, etc. I guess it still makes sense for the single input
> channel.
> >  Assuming there were already accumulated 100 unconsumed buffers in one
> remote input channel when the barrier arrives from the network, then we can
> > decide whether to trigger checkpoint immediately based on
> UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on
> UNALIGNED_WITH_MAX_INFLIGHT_DATA
> > if 100 is not reaching its max threshold.
> >
> > 4. I remembered that we ever discussed the options internally before. I
> agree with that the adaptive way might seem more flexible, but also mean
> more complicated
> >  in design and implementation. As the first step of unaligned
> checkpoint, it seems more make sense to take an easy way for only
> concentrating on the function and
> >  practical effect. After getting some feedbacks to convince us, I guess
> the adaptive way might be probably an option to consider if really
> necessary in future.
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > From:Yu Li <ca...@gmail.com>
> > Send Time:2020 Feb. 26 (Wed.) 12:59
> > To:dev <de...@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi All,
> >
> > Sorry for being late to the discussion. I've gone through the latest FLIP
> > document and have below questions/suggestions:
> >
> > 1. Do we support asynchronous checkpointing on the in-flight data?
> >     * From the doc the answer seems to be yes (state-based storage for
> the
> > first version), and if so, there would be additional memory consumption
> on
> > network buffer during checkpoint and we should take this into account,
> > especially in container environment.
> >
> > 2. I suggest we also take local recovery into consideration during
> > implementation, which could speed up the recovery speed especially when
> the
> > amount of in-flight data is huge.
> >
> > 3. About checkpointing policy, are the below understanding correct? Maybe
> > it helps if we map them more explicitly in FLIP doc, IMHO:
> >     * For single input channel, there's no difference between
> > UNALIGNED_WITH_MAX_INFLIGHT_DATA
> > and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
> > checkpoint once observe the barrier in the input channel.
> >     * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
> > starting checkpoint only when barrier appears in all input channels,
> > while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint
> when
> > barrier appears in any one of the input channels.
> >
> > 4. It seems now we only support pre-defined options, but is it possible
> to
> > switch in between dynamically? For example, if we predefine the policy to
> > ALIGNED, could we supply a command to switch
> > to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed?
> Or
> > switch to ALIGNED if we see too much data persisted for
> > UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> > what's preventing us from being more adaptive?
> >
> > Thanks!
> >
> > Best Regards,
> > Yu
> >
> >
> > On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com>
> wrote:
> >
> >> Hi,
> >>
> >> I would like to propose a modification to this FLIP.
> >>
> >> Based on the feedback that we were receiving after publishing this
> >> document and during Flink Forward, I was growing more and more anxious
> >> about one issue here: having to persist all buffered in-flight data at
> >> once. As the volume of this data might be large (GBs per TaskManager
> even
> >> with small clusters and relatively simple jobs), the time to persist
> all of
> >> this data at once might be quite substantial.
> >>
> >>
> >>
> >> To address this issue, I would like to propose that at first we
> implement
> >> a variant of unaligned checkpoints, just as written down in FLIP-76, but
> >> with continuous spilling - all data will be persisted/spilled
> continuously,
> >> all the time as they come - not at once when the checkpoint starts.
> Think
> >> about this proposal as incremental way of persisting the data.
> >>
> >> Pros of continuous spilling:
> >> + faster checkpointing, as there will be no need to store GBs of data,
> >> just flush/close.
> >> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> loads,
> >> steady records throughput and spilling.
> >>
> >> Cons of continuous spilling:
> >> - need to persist all of the network traffic instead of persisting just
> >> the in-flight data
> >>
> >> Larger volume of persisted data doesn’t matter that much from the
> >> perspective of the throughput, as if you are unable to spill the data
> >> faster than to process them, unaligned checkpoints are worse option
> >> compared to the aligned checkpoints [1]. If checkpoints are frequent it
> >> also doesn’t matter [2]. The true downside is if checkpoints are
> infrequent
> >> and you have to for example pay $ for the extra storage or extra network
> >> traffic to the storage.
> >>
> >> On the other hand, continuous spilling (persistent communication
> >> channels?) might have an added benefit of enabling us localised
> failures -
> >> failure of one node will not necessarily bring down the whole cluster.
> >>
> >>
> >>
> >> As I mentioned, I’m proposing to just start with the continuous
> spilling.
> >> It might be more costly in some scenarios, but it will offer the most
> >> stable and predictable performance with the lowest checkpoint latency.
> It’s
> >> not perfect, it won’t solve all of the use cases, but frankly all of the
> >> other options have their own blind spots, and continuous spilling
> should at
> >> least fully solve relatively low throughput use cases. We can later
> build
> >> on top of that solution, expanding it with the following features:
> >>
> >> 1. Do not spill continuously if there is no backpressure. For example
> >> provide a timeout: start spilling pre-emptively/continuously if some
> buffer
> >> was not processed within X seconds.
> >> 2. Start spilling only once the checkpoint starts (this is the exact
> >> proposal from the current FLIP-76).
> >> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> >> but in the future we are considering other options, for example Apache
> >> Bookeeper.
> >>
> >> What do you think?
> >>
> >> Piotrek
> >>
> >>
> >>
> >> [1] I’m assuming that the spilling throughput per node can go up to
> >> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> >> in-flight data will take 3.3 times longer than waiting for the
> alignment.
> >> On the other hand if data processing rate is 10MB/s, overhead of
> continuous
> >> spilling is relatively low.
> >> [2] With checkpoints every one minute, with data processing throughput
> >> 30MB/s per node, we would have to persist 1.8GB of data per node between
> >> the checkpoints, which is similar order of magnitude as buffered
> in-flight
> >> data under the back-pressure. With higher throughput, unaligned
> checkpoints
> >> are not helping ([1]). With lower throughput, both the original proposal
> >> and continuous spilling would have to effectively persist all of the
> data
> >> anyway.
> >>
> >>> On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
> >>>
> >>> Hi Arvid
> >>>
> >>> +1 for this future which has been hoped for a long time. End-to-end
> >> exactly once job could benefit from quicker checkpoint completion.
> >>>
> >>>
> >>> Best
> >>> Yun Tang
> >>> ________________________________
> >>> From: Yun Gao <yu...@aliyun.com.INVALID>
> >>> Sent: Thursday, October 10, 2019 18:39
> >>> To: dev <de...@flink.apache.org>
> >>> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >>>
> >>>   Hi Arvid,
> >>>
> >>>           Very thanks for bring up the discussion! From our side unable
> >> to finish the checkpoint is commonly met for online jobs, therefore +1
> from
> >> my side to implement this.
> >>>          A tiny issue of the FLIP is that the Discussion Thread URL
> >> attached seems to be not right.
> >>>
> >>>
> >>>    Best,
> >>>    Yun
> >>>
> >>>
> >>> ------------------------------------------------------------------
> >>> From:Arvid Heise <ar...@ververica.com>
> >>> Send Time:2019 Sep. 30 (Mon.) 20:31
> >>> To:dev <de...@flink.apache.org>
> >>> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> >>>
> >>> Hi Devs,
> >>>
> >>> I would like to start the formal discussion about FLIP-76 [1], which
> >>> improves the checkpoint latency in systems under backpressure, where a
> >>> checkpoint can take hours to complete in the worst case. I recommend
> the
> >>> thread "checkpointing under backpressure" [2] to get a good idea why
> >> users
> >>> are not satisfied with the current behavior. The key points:
> >>>
> >>>  - Since the checkpoint barrier flows much slower through the
> >>>  back-pressured channels, the other channels and their upstream
> >> operators
> >>>  are effectively blocked during checkpointing.
> >>>  - The checkpoint barrier takes a long time to reach the sinks causing
> >>>  long checkpointing times. A longer checkpointing time in turn means
> >> that
> >>>  the checkpoint will be fairly outdated once done. Since a heavily
> >> utilized
> >>>  pipeline is inherently more fragile, we may run into a vicious cycle
> of
> >>>  late checkpoints, crash, recovery to a rather outdated checkpoint,
> more
> >>>  back pressure, and even later checkpoints, which would result in
> >> little to
> >>>  no progress in the application.
> >>>
> >>> The FLIP proposes "unaligned checkpoints" which improves the current
> >> state,
> >>> such that
> >>>
> >>>  - Upstream processes can continue to produce data, even if some
> >> operator
> >>>  still waits on a checkpoint barrier on a specific input channel.
> >>>  - Checkpointing times are heavily reduced across the execution graph,
> >>>  even for operators with a single input channel.
> >>>  - End-users will see more progress even in unstable environments as
> >> more
> >>>  up-to-date checkpoints will avoid too many recomputations.
> >>>  - Facilitate faster rescaling.
> >>>
> >>> The key idea is to allow checkpoint barriers to be forwarded to
> >> downstream
> >>> tasks before the synchronous part of the checkpointing has been
> conducted
> >>> (see Fig. 1). To that end, we need to store in-flight data as part of
> the
> >>> checkpoint as described in greater details in this FLIP.
> >>>
> >>> Although the basic idea was already sketched in [2], we would like get
> >>> broader feedback in this dedicated mail thread.
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> >>> [2]
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
> >>>
> >>
> >>
> >
> >
>
>

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi Yu,

Re 4.

Dynamic switching between unaligned and aligned checkpoints based on some kind of thresholds (timeout, or checkpoint size) is definitely one of the very first improvement that we want to tackle after implementing the MVP. Depending on the time constraints, dynamic switching can make to 1.11 or not. It’s hard to tell for me at this point of time.

Piotrek

> On 26 Feb 2020, at 15:59, Zhijiang <wa...@aliyun.com.INVALID> wrote:
> 
> Thanks for the further explanations, Yu!
> 
> 1. The inflight buffer spilling process is indeed handled asynchronously. While the buffer is not finished spilling, it would not be recycled to reuse again.
> Your understanding is right. I guess I misunderstood your previous concern of additional memory consumption from the perspective of buffer usage.
> My point of no additional memory consumption is from the perspective of total network memory size which would not be increased as a result.
> 
> 2. We treat the inflight buffers as input&output states which are equivalent with existing operator states, and try to make use of all the existing mechanisms for
> state handle and assignment during recovery. So i guess for the local recovery it should be the similar case. I would think through whether it has some special
> work to do around with local recovery, and then clarify it in FLIP after we reach an agreement internally. BTW, this FLIP has not finalized yet.
> 
> 3. Yes, the previous proposal is for measuring how many inflight buffers to be spilled which refers to the data size if really taking this way. I think the proposed option
> in FLIP are the initial thoughts for various of possibilities. Which way we decide to take for the first version, I guess we need to further finalize before voting.
> 
> 4. I think there probably exists the requirements or scenarios from users as you mentioned. Actually we have not finalized the way of switching to unaligned checkpoint yet.
> Anyway we could provide an option for users to try out this feature at the beginning, although it might be not the most ideal one. Another input is that we know the motivation
> of unaligned checkpoint is from the scenarios of backpressure, but it might also performs well in the case of non backpressure, even shorten the checkpoint duration without
> obvious performance regression in our previous POC testing. So the backpressure might not be the only factor to switch to the unaligned way in practice I guess. Anyway your
> inputs are valuable for us to make the final decision.
> 
> Best,
> Zhijiang
> 
> 
> 
> 
> ------------------------------------------------------------------
> From:Yu Li <ca...@gmail.com>
> Send Time:2020 Feb. 26 (Wed.) 15:59
> To:dev <de...@flink.apache.org>; Zhijiang <wa...@aliyun.com>
> Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> 
> Hi Zhijiang,
> 
> Thanks for the quick reply!
> 
> For the 1st question, please allow me to confirm, that when doing asynchronous checkpointing, disk spilling should happen in background in parallel with receiving/sending new data, or else it would become synchronous, right? Based on such assumption, some copy-on-write like mechanism would be necessary to make sure the new updates won't modify the to-be-checkpointed data, and this is where the additional memory consumption comes from.
> 
> About point #2, I suggest we write it down in the FLIP document about local recovery support (if reach a consensus here), to make sure it won't be neglected in later implementation (I believe there're still some work to do following existing local recovery mechanism). What do you think?
> 
> For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would set some kind of threshold about "how much in-flight data to checkpoint"? If so, could you further clarify the measurement (data size? record number? others?) since there seems to be no description in the current FLIP doc? This is somewhat different from my understanding after reading the FLIP...
> 
> Regarding question #4, I have no doubt that the new unaligned checkpoint mechanism could make fast checkpoint possible, at the cost of more memory, network bandwidth and disk space consumption. However, (correct me if I'm wrong) for users who are satisfied with the existing aligned checkpoint interval, taking the constant cost to prevent delayed checkpoint during back pressure - a relatively low frequency event - may not be that pragmatic.
> 
> Best Regards,
> Yu
> 
> On Wed, 26 Feb 2020 at 15:07, Zhijiang <wa...@aliyun.com.invalid> wrote:
> Hi Yu,
> 
> Thanks for concerning of this FLIP and sharing your thoughts! Let me try to answer some below questions.
> 
> 1. Yes, the asynchronous checkpointing should be part of whole process and be supported naturally. As for the network memory concern, 
> the inflight-buffers would be spilled into persistent storage while triggering checkpoint, and are recycled to receive/send data after finish spilling.
> We still reuse the current network memory setting, so the maximum inflight-buffers would not exceed that amount, and there would not have
>  additional memory consumption.
> 
> 2. Yes, we would try to reuse the existing checkpoint recovery mechanism for simple implementation.
> 
> 3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of triggering checkpoint
> at proper time, the tradeoff between checkpoint duration and spilling inflight data, etc. I guess it still makes sense for the single input channel.
>  Assuming there were already accumulated 100 unconsumed buffers in one remote input channel when the barrier arrives from the network, then we can
> decide whether to trigger checkpoint immediately based on UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on UNALIGNED_WITH_MAX_INFLIGHT_DATA
> if 100 is not reaching its max threshold.
> 
> 4. I remembered that we ever discussed the options internally before. I agree with that the adaptive way might seem more flexible, but also mean more complicated
>  in design and implementation. As the first step of unaligned checkpoint, it seems more make sense to take an easy way for only concentrating on the function and
>  practical effect. After getting some feedbacks to convince us, I guess the adaptive way might be probably an option to consider if really necessary in future.
> 
> Best,
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> From:Yu Li <ca...@gmail.com>
> Send Time:2020 Feb. 26 (Wed.) 12:59
> To:dev <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> 
> Hi All,
> 
> Sorry for being late to the discussion. I've gone through the latest FLIP
> document and have below questions/suggestions:
> 
> 1. Do we support asynchronous checkpointing on the in-flight data?
>     * From the doc the answer seems to be yes (state-based storage for the
> first version), and if so, there would be additional memory consumption on
> network buffer during checkpoint and we should take this into account,
> especially in container environment.
> 
> 2. I suggest we also take local recovery into consideration during
> implementation, which could speed up the recovery speed especially when the
> amount of in-flight data is huge.
> 
> 3. About checkpointing policy, are the below understanding correct? Maybe
> it helps if we map them more explicitly in FLIP doc, IMHO:
>     * For single input channel, there's no difference between
> UNALIGNED_WITH_MAX_INFLIGHT_DATA
> and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
> checkpoint once observe the barrier in the input channel.
>     * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
> starting checkpoint only when barrier appears in all input channels,
> while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint when
> barrier appears in any one of the input channels.
> 
> 4. It seems now we only support pre-defined options, but is it possible to
> switch in between dynamically? For example, if we predefine the policy to
> ALIGNED, could we supply a command to switch
> to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? Or
> switch to ALIGNED if we see too much data persisted for
> UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> what's preventing us from being more adaptive?
> 
> Thanks!
> 
> Best Regards,
> Yu
> 
> 
> On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com> wrote:
> 
>> Hi,
>> 
>> I would like to propose a modification to this FLIP.
>> 
>> Based on the feedback that we were receiving after publishing this
>> document and during Flink Forward, I was growing more and more anxious
>> about one issue here: having to persist all buffered in-flight data at
>> once. As the volume of this data might be large (GBs per TaskManager even
>> with small clusters and relatively simple jobs), the time to persist all of
>> this data at once might be quite substantial.
>> 
>> 
>> 
>> To address this issue, I would like to propose that at first we implement
>> a variant of unaligned checkpoints, just as written down in FLIP-76, but
>> with continuous spilling - all data will be persisted/spilled continuously,
>> all the time as they come - not at once when the checkpoint starts. Think
>> about this proposal as incremental way of persisting the data.
>> 
>> Pros of continuous spilling:
>> + faster checkpointing, as there will be no need to store GBs of data,
>> just flush/close.
>> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU loads,
>> steady records throughput and spilling.
>> 
>> Cons of continuous spilling:
>> - need to persist all of the network traffic instead of persisting just
>> the in-flight data
>> 
>> Larger volume of persisted data doesn’t matter that much from the
>> perspective of the throughput, as if you are unable to spill the data
>> faster than to process them, unaligned checkpoints are worse option
>> compared to the aligned checkpoints [1]. If checkpoints are frequent it
>> also doesn’t matter [2]. The true downside is if checkpoints are infrequent
>> and you have to for example pay $ for the extra storage or extra network
>> traffic to the storage.
>> 
>> On the other hand, continuous spilling (persistent communication
>> channels?) might have an added benefit of enabling us localised failures -
>> failure of one node will not necessarily bring down the whole cluster.
>> 
>> 
>> 
>> As I mentioned, I’m proposing to just start with the continuous spilling.
>> It might be more costly in some scenarios, but it will offer the most
>> stable and predictable performance with the lowest checkpoint latency. It’s
>> not perfect, it won’t solve all of the use cases, but frankly all of the
>> other options have their own blind spots, and continuous spilling should at
>> least fully solve relatively low throughput use cases. We can later build
>> on top of that solution, expanding it with the following features:
>> 
>> 1. Do not spill continuously if there is no backpressure. For example
>> provide a timeout: start spilling pre-emptively/continuously if some buffer
>> was not processed within X seconds.
>> 2. Start spilling only once the checkpoint starts (this is the exact
>> proposal from the current FLIP-76).
>> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
>> but in the future we are considering other options, for example Apache
>> Bookeeper.
>> 
>> What do you think?
>> 
>> Piotrek
>> 
>> 
>> 
>> [1] I’m assuming that the spilling throughput per node can go up to
>> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
>> in-flight data will take 3.3 times longer than waiting for the alignment.
>> On the other hand if data processing rate is 10MB/s, overhead of continuous
>> spilling is relatively low.
>> [2] With checkpoints every one minute, with data processing throughput
>> 30MB/s per node, we would have to persist 1.8GB of data per node between
>> the checkpoints, which is similar order of magnitude as buffered in-flight
>> data under the back-pressure. With higher throughput, unaligned checkpoints
>> are not helping ([1]). With lower throughput, both the original proposal
>> and continuous spilling would have to effectively persist all of the data
>> anyway.
>> 
>>> On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
>>> 
>>> Hi Arvid
>>> 
>>> +1 for this future which has been hoped for a long time. End-to-end
>> exactly once job could benefit from quicker checkpoint completion.
>>> 
>>> 
>>> Best
>>> Yun Tang
>>> ________________________________
>>> From: Yun Gao <yu...@aliyun.com.INVALID>
>>> Sent: Thursday, October 10, 2019 18:39
>>> To: dev <de...@flink.apache.org>
>>> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
>>> 
>>>   Hi Arvid,
>>> 
>>>           Very thanks for bring up the discussion! From our side unable
>> to finish the checkpoint is commonly met for online jobs, therefore +1 from
>> my side to implement this.
>>>          A tiny issue of the FLIP is that the Discussion Thread URL
>> attached seems to be not right.
>>> 
>>> 
>>>    Best,
>>>    Yun
>>> 
>>> 
>>> ------------------------------------------------------------------
>>> From:Arvid Heise <ar...@ververica.com>
>>> Send Time:2019 Sep. 30 (Mon.) 20:31
>>> To:dev <de...@flink.apache.org>
>>> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
>>> 
>>> Hi Devs,
>>> 
>>> I would like to start the formal discussion about FLIP-76 [1], which
>>> improves the checkpoint latency in systems under backpressure, where a
>>> checkpoint can take hours to complete in the worst case. I recommend the
>>> thread "checkpointing under backpressure" [2] to get a good idea why
>> users
>>> are not satisfied with the current behavior. The key points:
>>> 
>>>  - Since the checkpoint barrier flows much slower through the
>>>  back-pressured channels, the other channels and their upstream
>> operators
>>>  are effectively blocked during checkpointing.
>>>  - The checkpoint barrier takes a long time to reach the sinks causing
>>>  long checkpointing times. A longer checkpointing time in turn means
>> that
>>>  the checkpoint will be fairly outdated once done. Since a heavily
>> utilized
>>>  pipeline is inherently more fragile, we may run into a vicious cycle of
>>>  late checkpoints, crash, recovery to a rather outdated checkpoint, more
>>>  back pressure, and even later checkpoints, which would result in
>> little to
>>>  no progress in the application.
>>> 
>>> The FLIP proposes "unaligned checkpoints" which improves the current
>> state,
>>> such that
>>> 
>>>  - Upstream processes can continue to produce data, even if some
>> operator
>>>  still waits on a checkpoint barrier on a specific input channel.
>>>  - Checkpointing times are heavily reduced across the execution graph,
>>>  even for operators with a single input channel.
>>>  - End-users will see more progress even in unstable environments as
>> more
>>>  up-to-date checkpoints will avoid too many recomputations.
>>>  - Facilitate faster rescaling.
>>> 
>>> The key idea is to allow checkpoint barriers to be forwarded to
>> downstream
>>> tasks before the synchronous part of the checkpointing has been conducted
>>> (see Fig. 1). To that end, we need to store in-flight data as part of the
>>> checkpoint as described in greater details in this FLIP.
>>> 
>>> Although the basic idea was already sketched in [2], we would like get
>>> broader feedback in this dedicated mail thread.
>>> 
>>> Best,
>>> 
>>> Arvid
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>> [2]
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
>>> 
>> 
>> 
> 
> 


Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Posted by Zhijiang <wa...@aliyun.com.INVALID>.
Thanks for the further explanations, Yu!

1. The inflight buffer spilling process is indeed handled asynchronously. While the buffer is not finished spilling, it would not be recycled to reuse again.
Your understanding is right. I guess I misunderstood your previous concern of additional memory consumption from the perspective of buffer usage.
My point of no additional memory consumption is from the perspective of total network memory size which would not be increased as a result.

2. We treat the inflight buffers as input&output states which are equivalent with existing operator states, and try to make use of all the existing mechanisms for
state handle and assignment during recovery. So i guess for the local recovery it should be the similar case. I would think through whether it has some special
work to do around with local recovery, and then clarify it in FLIP after we reach an agreement internally. BTW, this FLIP has not finalized yet.

3. Yes, the previous proposal is for measuring how many inflight buffers to be spilled which refers to the data size if really taking this way. I think the proposed option
in FLIP are the initial thoughts for various of possibilities. Which way we decide to take for the first version, I guess we need to further finalize before voting.

4. I think there probably exists the requirements or scenarios from users as you mentioned. Actually we have not finalized the way of switching to unaligned checkpoint yet.
Anyway we could provide an option for users to try out this feature at the beginning, although it might be not the most ideal one. Another input is that we know the motivation
of unaligned checkpoint is from the scenarios of backpressure, but it might also performs well in the case of non backpressure, even shorten the checkpoint duration without
obvious performance regression in our previous POC testing. So the backpressure might not be the only factor to switch to the unaligned way in practice I guess. Anyway your
inputs are valuable for us to make the final decision.

Best,
Zhijiang




------------------------------------------------------------------
From:Yu Li <ca...@gmail.com>
Send Time:2020 Feb. 26 (Wed.) 15:59
To:dev <de...@flink.apache.org>; Zhijiang <wa...@aliyun.com>
Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Hi Zhijiang,

Thanks for the quick reply!

For the 1st question, please allow me to confirm, that when doing asynchronous checkpointing, disk spilling should happen in background in parallel with receiving/sending new data, or else it would become synchronous, right? Based on such assumption, some copy-on-write like mechanism would be necessary to make sure the new updates won't modify the to-be-checkpointed data, and this is where the additional memory consumption comes from.

About point #2, I suggest we write it down in the FLIP document about local recovery support (if reach a consensus here), to make sure it won't be neglected in later implementation (I believe there're still some work to do following existing local recovery mechanism). What do you think?

For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would set some kind of threshold about "how much in-flight data to checkpoint"? If so, could you further clarify the measurement (data size? record number? others?) since there seems to be no description in the current FLIP doc? This is somewhat different from my understanding after reading the FLIP...

Regarding question #4, I have no doubt that the new unaligned checkpoint mechanism could make fast checkpoint possible, at the cost of more memory, network bandwidth and disk space consumption. However, (correct me if I'm wrong) for users who are satisfied with the existing aligned checkpoint interval, taking the constant cost to prevent delayed checkpoint during back pressure - a relatively low frequency event - may not be that pragmatic.

Best Regards,
Yu

On Wed, 26 Feb 2020 at 15:07, Zhijiang <wa...@aliyun.com.invalid> wrote:
Hi Yu,

 Thanks for concerning of this FLIP and sharing your thoughts! Let me try to answer some below questions.

 1. Yes, the asynchronous checkpointing should be part of whole process and be supported naturally. As for the network memory concern, 
 the inflight-buffers would be spilled into persistent storage while triggering checkpoint, and are recycled to receive/send data after finish spilling.
 We still reuse the current network memory setting, so the maximum inflight-buffers would not exceed that amount, and there would not have
  additional memory consumption.

 2. Yes, we would try to reuse the existing checkpoint recovery mechanism for simple implementation.

 3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of triggering checkpoint
 at proper time, the tradeoff between checkpoint duration and spilling inflight data, etc. I guess it still makes sense for the single input channel.
  Assuming there were already accumulated 100 unconsumed buffers in one remote input channel when the barrier arrives from the network, then we can
 decide whether to trigger checkpoint immediately based on UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on UNALIGNED_WITH_MAX_INFLIGHT_DATA
 if 100 is not reaching its max threshold.

 4. I remembered that we ever discussed the options internally before. I agree with that the adaptive way might seem more flexible, but also mean more complicated
  in design and implementation. As the first step of unaligned checkpoint, it seems more make sense to take an easy way for only concentrating on the function and
  practical effect. After getting some feedbacks to convince us, I guess the adaptive way might be probably an option to consider if really necessary in future.

 Best,
 Zhijiang


 ------------------------------------------------------------------
 From:Yu Li <ca...@gmail.com>
 Send Time:2020 Feb. 26 (Wed.) 12:59
 To:dev <de...@flink.apache.org>
 Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints

 Hi All,

 Sorry for being late to the discussion. I've gone through the latest FLIP
 document and have below questions/suggestions:

 1. Do we support asynchronous checkpointing on the in-flight data?
     * From the doc the answer seems to be yes (state-based storage for the
 first version), and if so, there would be additional memory consumption on
 network buffer during checkpoint and we should take this into account,
 especially in container environment.

 2. I suggest we also take local recovery into consideration during
 implementation, which could speed up the recovery speed especially when the
 amount of in-flight data is huge.

 3. About checkpointing policy, are the below understanding correct? Maybe
 it helps if we map them more explicitly in FLIP doc, IMHO:
     * For single input channel, there's no difference between
 UNALIGNED_WITH_MAX_INFLIGHT_DATA
 and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
 checkpoint once observe the barrier in the input channel.
     * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
 starting checkpoint only when barrier appears in all input channels,
 while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint when
 barrier appears in any one of the input channels.

 4. It seems now we only support pre-defined options, but is it possible to
 switch in between dynamically? For example, if we predefine the policy to
 ALIGNED, could we supply a command to switch
 to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? Or
 switch to ALIGNED if we see too much data persisted for
 UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
 what's preventing us from being more adaptive?

 Thanks!

 Best Regards,
 Yu


 On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com> wrote:

 > Hi,
 >
 > I would like to propose a modification to this FLIP.
 >
 > Based on the feedback that we were receiving after publishing this
 > document and during Flink Forward, I was growing more and more anxious
 > about one issue here: having to persist all buffered in-flight data at
 > once. As the volume of this data might be large (GBs per TaskManager even
 > with small clusters and relatively simple jobs), the time to persist all of
 > this data at once might be quite substantial.
 >
 >
 >
 > To address this issue, I would like to propose that at first we implement
 > a variant of unaligned checkpoints, just as written down in FLIP-76, but
 > with continuous spilling - all data will be persisted/spilled continuously,
 > all the time as they come - not at once when the checkpoint starts. Think
 > about this proposal as incremental way of persisting the data.
 >
 > Pros of continuous spilling:
 > + faster checkpointing, as there will be no need to store GBs of data,
 > just flush/close.
 > + more predictable behaviour. Instead of jerky/varying/spike IO/CPU loads,
 > steady records throughput and spilling.
 >
 > Cons of continuous spilling:
 > - need to persist all of the network traffic instead of persisting just
 > the in-flight data
 >
 > Larger volume of persisted data doesn’t matter that much from the
 > perspective of the throughput, as if you are unable to spill the data
 > faster than to process them, unaligned checkpoints are worse option
 > compared to the aligned checkpoints [1]. If checkpoints are frequent it
 > also doesn’t matter [2]. The true downside is if checkpoints are infrequent
 > and you have to for example pay $ for the extra storage or extra network
 > traffic to the storage.
 >
 > On the other hand, continuous spilling (persistent communication
 > channels?) might have an added benefit of enabling us localised failures -
 > failure of one node will not necessarily bring down the whole cluster.
 >
 >
 >
 > As I mentioned, I’m proposing to just start with the continuous spilling.
 > It might be more costly in some scenarios, but it will offer the most
 > stable and predictable performance with the lowest checkpoint latency. It’s
 > not perfect, it won’t solve all of the use cases, but frankly all of the
 > other options have their own blind spots, and continuous spilling should at
 > least fully solve relatively low throughput use cases. We can later build
 > on top of that solution, expanding it with the following features:
 >
 > 1. Do not spill continuously if there is no backpressure. For example
 > provide a timeout: start spilling pre-emptively/continuously if some buffer
 > was not processed within X seconds.
 > 2. Start spilling only once the checkpoint starts (this is the exact
 > proposal from the current FLIP-76).
 > 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
 > but in the future we are considering other options, for example Apache
 > Bookeeper.
 >
 > What do you think?
 >
 > Piotrek
 >
 >
 >
 > [1] I’m assuming that the spilling throughput per node can go up to
 > ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
 > in-flight data will take 3.3 times longer than waiting for the alignment.
 > On the other hand if data processing rate is 10MB/s, overhead of continuous
 > spilling is relatively low.
 > [2] With checkpoints every one minute, with data processing throughput
 > 30MB/s per node, we would have to persist 1.8GB of data per node between
 > the checkpoints, which is similar order of magnitude as buffered in-flight
 > data under the back-pressure. With higher throughput, unaligned checkpoints
 > are not helping ([1]). With lower throughput, both the original proposal
 > and continuous spilling would have to effectively persist all of the data
 > anyway.
 >
 > > On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
 > >
 > > Hi Arvid
 > >
 > > +1 for this future which has been hoped for a long time. End-to-end
 > exactly once job could benefit from quicker checkpoint completion.
 > >
 > >
 > > Best
 > > Yun Tang
 > > ________________________________
 > > From: Yun Gao <yu...@aliyun.com.INVALID>
 > > Sent: Thursday, October 10, 2019 18:39
 > > To: dev <de...@flink.apache.org>
 > > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
 > >
 > >    Hi Arvid,
 > >
 > >            Very thanks for bring up the discussion! From our side unable
 > to finish the checkpoint is commonly met for online jobs, therefore +1 from
 > my side to implement this.
 > >           A tiny issue of the FLIP is that the Discussion Thread URL
 > attached seems to be not right.
 > >
 > >
 > >     Best,
 > >     Yun
 > >
 > >
 > > ------------------------------------------------------------------
 > > From:Arvid Heise <ar...@ververica.com>
 > > Send Time:2019 Sep. 30 (Mon.) 20:31
 > > To:dev <de...@flink.apache.org>
 > > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
 > >
 > > Hi Devs,
 > >
 > > I would like to start the formal discussion about FLIP-76 [1], which
 > > improves the checkpoint latency in systems under backpressure, where a
 > > checkpoint can take hours to complete in the worst case. I recommend the
 > > thread "checkpointing under backpressure" [2] to get a good idea why
 > users
 > > are not satisfied with the current behavior. The key points:
 > >
 > >   - Since the checkpoint barrier flows much slower through the
 > >   back-pressured channels, the other channels and their upstream
 > operators
 > >   are effectively blocked during checkpointing.
 > >   - The checkpoint barrier takes a long time to reach the sinks causing
 > >   long checkpointing times. A longer checkpointing time in turn means
 > that
 > >   the checkpoint will be fairly outdated once done. Since a heavily
 > utilized
 > >   pipeline is inherently more fragile, we may run into a vicious cycle of
 > >   late checkpoints, crash, recovery to a rather outdated checkpoint, more
 > >   back pressure, and even later checkpoints, which would result in
 > little to
 > >   no progress in the application.
 > >
 > > The FLIP proposes "unaligned checkpoints" which improves the current
 > state,
 > > such that
 > >
 > >   - Upstream processes can continue to produce data, even if some
 > operator
 > >   still waits on a checkpoint barrier on a specific input channel.
 > >   - Checkpointing times are heavily reduced across the execution graph,
 > >   even for operators with a single input channel.
 > >   - End-users will see more progress even in unstable environments as
 > more
 > >   up-to-date checkpoints will avoid too many recomputations.
 > >   - Facilitate faster rescaling.
 > >
 > > The key idea is to allow checkpoint barriers to be forwarded to
 > downstream
 > > tasks before the synchronous part of the checkpointing has been conducted
 > > (see Fig. 1). To that end, we need to store in-flight data as part of the
 > > checkpoint as described in greater details in this FLIP.
 > >
 > > Although the basic idea was already sketched in [2], we would like get
 > > broader feedback in this dedicated mail thread.
 > >
 > > Best,
 > >
 > > Arvid
 > >
 > > [1]
 > >
 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
 > > [2]
 > >
 > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
 > >
 >
 >



Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Posted by Yu Li <ca...@gmail.com>.
Hi Zhijiang,

Thanks for the quick reply!

For the 1st question, please allow me to confirm, that when doing
asynchronous checkpointing, disk spilling should happen in background in
parallel with receiving/sending new data, or else it would become
synchronous, right? Based on such assumption, some copy-on-write like
mechanism would be necessary to make sure the new updates won't modify the
to-be-checkpointed data, and this is where the additional memory
consumption comes from.

About point #2, I suggest we write it down in the FLIP document about local
recovery support (if reach a consensus here), to make sure it won't be
neglected in later implementation (I believe there're still some work to do
following existing local recovery mechanism). What do you think?

For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would set
some kind of threshold about "how much in-flight data to checkpoint"? If
so, could you further clarify the measurement (data size? record number?
others?) since there seems to be no description in the current FLIP doc?
This is somewhat different from my understanding after reading the FLIP...

Regarding question #4, I have no doubt that the new unaligned checkpoint
mechanism could make fast checkpoint possible, at the cost of more memory,
network bandwidth and disk space consumption. However, (correct me if I'm
wrong) for users who are satisfied with the existing aligned checkpoint
interval, taking the constant cost to prevent delayed checkpoint during
back pressure - a relatively low frequency event - may not be that
pragmatic.

Best Regards,
Yu


On Wed, 26 Feb 2020 at 15:07, Zhijiang <wa...@aliyun.com.invalid>
wrote:

> Hi Yu,
>
> Thanks for concerning of this FLIP and sharing your thoughts! Let me try
> to answer some below questions.
>
> 1. Yes, the asynchronous checkpointing should be part of whole process and
> be supported naturally. As for the network memory concern,
> the inflight-buffers would be spilled into persistent storage while
> triggering checkpoint, and are recycled to receive/send data after finish
> spilling.
> We still reuse the current network memory setting, so the maximum
> inflight-buffers would not exceed that amount, and there would not have
>  additional memory consumption.
>
> 2. Yes, we would try to reuse the existing checkpoint recovery mechanism
> for simple implementation.
>
> 3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and
> UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of
> triggering checkpoint
> at proper time, the tradeoff between checkpoint duration and spilling
> inflight data, etc. I guess it still makes sense for the single input
> channel.
>  Assuming there were already accumulated 100 unconsumed buffers in one
> remote input channel when the barrier arrives from the network, then we can
> decide whether to trigger checkpoint immediately based on
> UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on
> UNALIGNED_WITH_MAX_INFLIGHT_DATA
> if 100 is not reaching its max threshold.
>
> 4. I remembered that we ever discussed the options internally before. I
> agree with that the adaptive way might seem more flexible, but also mean
> more complicated
>  in design and implementation. As the first step of unaligned checkpoint,
> it seems more make sense to take an easy way for only concentrating on the
> function and
>  practical effect. After getting some feedbacks to convince us, I guess
> the adaptive way might be probably an option to consider if really
> necessary in future.
>
> Best,
> Zhijiang
>
>
> ------------------------------------------------------------------
> From:Yu Li <ca...@gmail.com>
> Send Time:2020 Feb. 26 (Wed.) 12:59
> To:dev <de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
>
> Hi All,
>
> Sorry for being late to the discussion. I've gone through the latest FLIP
> document and have below questions/suggestions:
>
> 1. Do we support asynchronous checkpointing on the in-flight data?
>     * From the doc the answer seems to be yes (state-based storage for the
> first version), and if so, there would be additional memory consumption on
> network buffer during checkpoint and we should take this into account,
> especially in container environment.
>
> 2. I suggest we also take local recovery into consideration during
> implementation, which could speed up the recovery speed especially when the
> amount of in-flight data is huge.
>
> 3. About checkpointing policy, are the below understanding correct? Maybe
> it helps if we map them more explicitly in FLIP doc, IMHO:
>     * For single input channel, there's no difference between
> UNALIGNED_WITH_MAX_INFLIGHT_DATA
> and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
> checkpoint once observe the barrier in the input channel.
>     * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
> starting checkpoint only when barrier appears in all input channels,
> while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint when
> barrier appears in any one of the input channels.
>
> 4. It seems now we only support pre-defined options, but is it possible to
> switch in between dynamically? For example, if we predefine the policy to
> ALIGNED, could we supply a command to switch
> to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? Or
> switch to ALIGNED if we see too much data persisted for
> UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> what's preventing us from being more adaptive?
>
> Thanks!
>
> Best Regards,
> Yu
>
>
> On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com> wrote:
>
> > Hi,
> >
> > I would like to propose a modification to this FLIP.
> >
> > Based on the feedback that we were receiving after publishing this
> > document and during Flink Forward, I was growing more and more anxious
> > about one issue here: having to persist all buffered in-flight data at
> > once. As the volume of this data might be large (GBs per TaskManager even
> > with small clusters and relatively simple jobs), the time to persist all
> of
> > this data at once might be quite substantial.
> >
> >
> >
> > To address this issue, I would like to propose that at first we implement
> > a variant of unaligned checkpoints, just as written down in FLIP-76, but
> > with continuous spilling - all data will be persisted/spilled
> continuously,
> > all the time as they come - not at once when the checkpoint starts. Think
> > about this proposal as incremental way of persisting the data.
> >
> > Pros of continuous spilling:
> > + faster checkpointing, as there will be no need to store GBs of data,
> > just flush/close.
> > + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> loads,
> > steady records throughput and spilling.
> >
> > Cons of continuous spilling:
> > - need to persist all of the network traffic instead of persisting just
> > the in-flight data
> >
> > Larger volume of persisted data doesn’t matter that much from the
> > perspective of the throughput, as if you are unable to spill the data
> > faster than to process them, unaligned checkpoints are worse option
> > compared to the aligned checkpoints [1]. If checkpoints are frequent it
> > also doesn’t matter [2]. The true downside is if checkpoints are
> infrequent
> > and you have to for example pay $ for the extra storage or extra network
> > traffic to the storage.
> >
> > On the other hand, continuous spilling (persistent communication
> > channels?) might have an added benefit of enabling us localised failures
> -
> > failure of one node will not necessarily bring down the whole cluster.
> >
> >
> >
> > As I mentioned, I’m proposing to just start with the continuous spilling.
> > It might be more costly in some scenarios, but it will offer the most
> > stable and predictable performance with the lowest checkpoint latency.
> It’s
> > not perfect, it won’t solve all of the use cases, but frankly all of the
> > other options have their own blind spots, and continuous spilling should
> at
> > least fully solve relatively low throughput use cases. We can later build
> > on top of that solution, expanding it with the following features:
> >
> > 1. Do not spill continuously if there is no backpressure. For example
> > provide a timeout: start spilling pre-emptively/continuously if some
> buffer
> > was not processed within X seconds.
> > 2. Start spilling only once the checkpoint starts (this is the exact
> > proposal from the current FLIP-76).
> > 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> > but in the future we are considering other options, for example Apache
> > Bookeeper.
> >
> > What do you think?
> >
> > Piotrek
> >
> >
> >
> > [1] I’m assuming that the spilling throughput per node can go up to
> > ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> > in-flight data will take 3.3 times longer than waiting for the alignment.
> > On the other hand if data processing rate is 10MB/s, overhead of
> continuous
> > spilling is relatively low.
> > [2] With checkpoints every one minute, with data processing throughput
> > 30MB/s per node, we would have to persist 1.8GB of data per node between
> > the checkpoints, which is similar order of magnitude as buffered
> in-flight
> > data under the back-pressure. With higher throughput, unaligned
> checkpoints
> > are not helping ([1]). With lower throughput, both the original proposal
> > and continuous spilling would have to effectively persist all of the data
> > anyway.
> >
> > > On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
> > >
> > > Hi Arvid
> > >
> > > +1 for this future which has been hoped for a long time. End-to-end
> > exactly once job could benefit from quicker checkpoint completion.
> > >
> > >
> > > Best
> > > Yun Tang
> > > ________________________________
> > > From: Yun Gao <yu...@aliyun.com.INVALID>
> > > Sent: Thursday, October 10, 2019 18:39
> > > To: dev <de...@flink.apache.org>
> > > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> > >
> > >    Hi Arvid,
> > >
> > >            Very thanks for bring up the discussion! From our side
> unable
> > to finish the checkpoint is commonly met for online jobs, therefore +1
> from
> > my side to implement this.
> > >           A tiny issue of the FLIP is that the Discussion Thread URL
> > attached seems to be not right.
> > >
> > >
> > >     Best,
> > >     Yun
> > >
> > >
> > > ------------------------------------------------------------------
> > > From:Arvid Heise <ar...@ververica.com>
> > > Send Time:2019 Sep. 30 (Mon.) 20:31
> > > To:dev <de...@flink.apache.org>
> > > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> > >
> > > Hi Devs,
> > >
> > > I would like to start the formal discussion about FLIP-76 [1], which
> > > improves the checkpoint latency in systems under backpressure, where a
> > > checkpoint can take hours to complete in the worst case. I recommend
> the
> > > thread "checkpointing under backpressure" [2] to get a good idea why
> > users
> > > are not satisfied with the current behavior. The key points:
> > >
> > >   - Since the checkpoint barrier flows much slower through the
> > >   back-pressured channels, the other channels and their upstream
> > operators
> > >   are effectively blocked during checkpointing.
> > >   - The checkpoint barrier takes a long time to reach the sinks causing
> > >   long checkpointing times. A longer checkpointing time in turn means
> > that
> > >   the checkpoint will be fairly outdated once done. Since a heavily
> > utilized
> > >   pipeline is inherently more fragile, we may run into a vicious cycle
> of
> > >   late checkpoints, crash, recovery to a rather outdated checkpoint,
> more
> > >   back pressure, and even later checkpoints, which would result in
> > little to
> > >   no progress in the application.
> > >
> > > The FLIP proposes "unaligned checkpoints" which improves the current
> > state,
> > > such that
> > >
> > >   - Upstream processes can continue to produce data, even if some
> > operator
> > >   still waits on a checkpoint barrier on a specific input channel.
> > >   - Checkpointing times are heavily reduced across the execution graph,
> > >   even for operators with a single input channel.
> > >   - End-users will see more progress even in unstable environments as
> > more
> > >   up-to-date checkpoints will avoid too many recomputations.
> > >   - Facilitate faster rescaling.
> > >
> > > The key idea is to allow checkpoint barriers to be forwarded to
> > downstream
> > > tasks before the synchronous part of the checkpointing has been
> conducted
> > > (see Fig. 1). To that end, we need to store in-flight data as part of
> the
> > > checkpoint as described in greater details in this FLIP.
> > >
> > > Although the basic idea was already sketched in [2], we would like get
> > > broader feedback in this dedicated mail thread.
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > > [2]
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
> > >
> >
> >
>
>

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Posted by Zhijiang <wa...@aliyun.com.INVALID>.
Hi Yu,

Thanks for concerning of this FLIP and sharing your thoughts! Let me try to answer some below questions.

1. Yes, the asynchronous checkpointing should be part of whole process and be supported naturally. As for the network memory concern, 
the inflight-buffers would be spilled into persistent storage while triggering checkpoint, and are recycled to receive/send data after finish spilling.
We still reuse the current network memory setting, so the maximum inflight-buffers would not exceed that amount, and there would not have
 additional memory consumption.

2. Yes, we would try to reuse the existing checkpoint recovery mechanism for simple implementation.

3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of triggering checkpoint
at proper time, the tradeoff between checkpoint duration and spilling inflight data, etc. I guess it still makes sense for the single input channel.
 Assuming there were already accumulated 100 unconsumed buffers in one remote input channel when the barrier arrives from the network, then we can
decide whether to trigger checkpoint immediately based on UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on UNALIGNED_WITH_MAX_INFLIGHT_DATA
if 100 is not reaching its max threshold.

4. I remembered that we ever discussed the options internally before. I agree with that the adaptive way might seem more flexible, but also mean more complicated
 in design and implementation. As the first step of unaligned checkpoint, it seems more make sense to take an easy way for only concentrating on the function and
 practical effect. After getting some feedbacks to convince us, I guess the adaptive way might be probably an option to consider if really necessary in future.

Best,
Zhijiang


------------------------------------------------------------------
From:Yu Li <ca...@gmail.com>
Send Time:2020 Feb. 26 (Wed.) 12:59
To:dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Hi All,

Sorry for being late to the discussion. I've gone through the latest FLIP
document and have below questions/suggestions:

1. Do we support asynchronous checkpointing on the in-flight data?
    * From the doc the answer seems to be yes (state-based storage for the
first version), and if so, there would be additional memory consumption on
network buffer during checkpoint and we should take this into account,
especially in container environment.

2. I suggest we also take local recovery into consideration during
implementation, which could speed up the recovery speed especially when the
amount of in-flight data is huge.

3. About checkpointing policy, are the below understanding correct? Maybe
it helps if we map them more explicitly in FLIP doc, IMHO:
    * For single input channel, there's no difference between
UNALIGNED_WITH_MAX_INFLIGHT_DATA
and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
checkpoint once observe the barrier in the input channel.
    * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
starting checkpoint only when barrier appears in all input channels,
while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint when
barrier appears in any one of the input channels.

4. It seems now we only support pre-defined options, but is it possible to
switch in between dynamically? For example, if we predefine the policy to
ALIGNED, could we supply a command to switch
to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? Or
switch to ALIGNED if we see too much data persisted for
UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
what's preventing us from being more adaptive?

Thanks!

Best Regards,
Yu


On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> I would like to propose a modification to this FLIP.
>
> Based on the feedback that we were receiving after publishing this
> document and during Flink Forward, I was growing more and more anxious
> about one issue here: having to persist all buffered in-flight data at
> once. As the volume of this data might be large (GBs per TaskManager even
> with small clusters and relatively simple jobs), the time to persist all of
> this data at once might be quite substantial.
>
>
>
> To address this issue, I would like to propose that at first we implement
> a variant of unaligned checkpoints, just as written down in FLIP-76, but
> with continuous spilling - all data will be persisted/spilled continuously,
> all the time as they come - not at once when the checkpoint starts. Think
> about this proposal as incremental way of persisting the data.
>
> Pros of continuous spilling:
> + faster checkpointing, as there will be no need to store GBs of data,
> just flush/close.
> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU loads,
> steady records throughput and spilling.
>
> Cons of continuous spilling:
> - need to persist all of the network traffic instead of persisting just
> the in-flight data
>
> Larger volume of persisted data doesn’t matter that much from the
> perspective of the throughput, as if you are unable to spill the data
> faster than to process them, unaligned checkpoints are worse option
> compared to the aligned checkpoints [1]. If checkpoints are frequent it
> also doesn’t matter [2]. The true downside is if checkpoints are infrequent
> and you have to for example pay $ for the extra storage or extra network
> traffic to the storage.
>
> On the other hand, continuous spilling (persistent communication
> channels?) might have an added benefit of enabling us localised failures -
> failure of one node will not necessarily bring down the whole cluster.
>
>
>
> As I mentioned, I’m proposing to just start with the continuous spilling.
> It might be more costly in some scenarios, but it will offer the most
> stable and predictable performance with the lowest checkpoint latency. It’s
> not perfect, it won’t solve all of the use cases, but frankly all of the
> other options have their own blind spots, and continuous spilling should at
> least fully solve relatively low throughput use cases. We can later build
> on top of that solution, expanding it with the following features:
>
> 1. Do not spill continuously if there is no backpressure. For example
> provide a timeout: start spilling pre-emptively/continuously if some buffer
> was not processed within X seconds.
> 2. Start spilling only once the checkpoint starts (this is the exact
> proposal from the current FLIP-76).
> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> but in the future we are considering other options, for example Apache
> Bookeeper.
>
> What do you think?
>
> Piotrek
>
>
>
> [1] I’m assuming that the spilling throughput per node can go up to
> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> in-flight data will take 3.3 times longer than waiting for the alignment.
> On the other hand if data processing rate is 10MB/s, overhead of continuous
> spilling is relatively low.
> [2] With checkpoints every one minute, with data processing throughput
> 30MB/s per node, we would have to persist 1.8GB of data per node between
> the checkpoints, which is similar order of magnitude as buffered in-flight
> data under the back-pressure. With higher throughput, unaligned checkpoints
> are not helping ([1]). With lower throughput, both the original proposal
> and continuous spilling would have to effectively persist all of the data
> anyway.
>
> > On 10 Oct 2019, at 19:51, Yun Tang <my...@live.com> wrote:
> >
> > Hi Arvid
> >
> > +1 for this future which has been hoped for a long time. End-to-end
> exactly once job could benefit from quicker checkpoint completion.
> >
> >
> > Best
> > Yun Tang
> > ________________________________
> > From: Yun Gao <yu...@aliyun.com.INVALID>
> > Sent: Thursday, October 10, 2019 18:39
> > To: dev <de...@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> >    Hi Arvid,
> >
> >            Very thanks for bring up the discussion! From our side unable
> to finish the checkpoint is commonly met for online jobs, therefore +1 from
> my side to implement this.
> >           A tiny issue of the FLIP is that the Discussion Thread URL
> attached seems to be not right.
> >
> >
> >     Best,
> >     Yun
> >
> >
> > ------------------------------------------------------------------
> > From:Arvid Heise <ar...@ververica.com>
> > Send Time:2019 Sep. 30 (Mon.) 20:31
> > To:dev <de...@flink.apache.org>
> > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi Devs,
> >
> > I would like to start the formal discussion about FLIP-76 [1], which
> > improves the checkpoint latency in systems under backpressure, where a
> > checkpoint can take hours to complete in the worst case. I recommend the
> > thread "checkpointing under backpressure" [2] to get a good idea why
> users
> > are not satisfied with the current behavior. The key points:
> >
> >   - Since the checkpoint barrier flows much slower through the
> >   back-pressured channels, the other channels and their upstream
> operators
> >   are effectively blocked during checkpointing.
> >   - The checkpoint barrier takes a long time to reach the sinks causing
> >   long checkpointing times. A longer checkpointing time in turn means
> that
> >   the checkpoint will be fairly outdated once done. Since a heavily
> utilized
> >   pipeline is inherently more fragile, we may run into a vicious cycle of
> >   late checkpoints, crash, recovery to a rather outdated checkpoint, more
> >   back pressure, and even later checkpoints, which would result in
> little to
> >   no progress in the application.
> >
> > The FLIP proposes "unaligned checkpoints" which improves the current
> state,
> > such that
> >
> >   - Upstream processes can continue to produce data, even if some
> operator
> >   still waits on a checkpoint barrier on a specific input channel.
> >   - Checkpointing times are heavily reduced across the execution graph,
> >   even for operators with a single input channel.
> >   - End-users will see more progress even in unstable environments as
> more
> >   up-to-date checkpoints will avoid too many recomputations.
> >   - Facilitate faster rescaling.
> >
> > The key idea is to allow checkpoint barriers to be forwarded to
> downstream
> > tasks before the synchronous part of the checkpointing has been conducted
> > (see Fig. 1). To that end, we need to store in-flight data as part of the
> > checkpoint as described in greater details in this FLIP.
> >
> > Although the basic idea was already sketched in [2], we would like get
> > broader feedback in this dedicated mail thread.
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
> >
>
>