You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by SHI Xiaogang <sh...@gmail.com> on 2019/06/11 08:33:28 UTC

[DISCUSS] Allow at-most-once delivery in case of failures

Flink offers a fault-tolerance mechanism to guarantee at-least-once and
exactly-once message delivery in case of failures. The mechanism works well
in practice and makes Flink stand out among stream processing systems.

But the guarantee on at-least-once and exactly-once delivery does not come
without price. It typically requires to restart multiple tasks and fall
back to the place where the last checkpoint is taken. (Fined-grained
recovery can help alleviate the cost, but it still needs certain efforts to
recover jobs.)

In some senarios, users perfer quick recovery and will trade correctness
off. For example, in some online recommendation systems, timeliness is far
more important than consistency. In such cases, we can restart only those
failed tasks individually, and do not need to perform any rollback. Though
some messages delivered to failed tasks may be lost, other tasks can
continuously provide service to users.

Many of our users are demanding for at-most-once delivery in Flink. What do
you think of the proposal? Any feedback is appreciated.

Regards,
Xiaogang Shi

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Biao Liu <mm...@gmail.com>.
Hi Xiaogang, it's an interesting discussion.

I have heard some of similar feature requirements before. Some users need a
lighter failover strategy since the correctness is not so critical for
their scenario as you mentioned. Even more some jobs may not enable
checkpointing at all, a global/region failover strategy actually doesn't
make sense for these jobs. The individual failover strategy doesn't work
well for these scenario since it only supports a topology without edges
currently.
Actually we have implemented a Best-effort failover strategy in our private
branch. There is a little difference with your proposal that it doesn't
support at-most-once mechanism. It has a weaker consistency model but with
a faster recovery ability. I think it would satisfy your scenario.


SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:

> Flink offers a fault-tolerance mechanism to guarantee at-least-once and
> exactly-once message delivery in case of failures. The mechanism works well
> in practice and makes Flink stand out among stream processing systems.
>
> But the guarantee on at-least-once and exactly-once delivery does not come
> without price. It typically requires to restart multiple tasks and fall
> back to the place where the last checkpoint is taken. (Fined-grained
> recovery can help alleviate the cost, but it still needs certain efforts to
> recover jobs.)
>
> In some senarios, users perfer quick recovery and will trade correctness
> off. For example, in some online recommendation systems, timeliness is far
> more important than consistency. In such cases, we can restart only those
> failed tasks individually, and do not need to perform any rollback. Though
> some messages delivered to failed tasks may be lost, other tasks can
> continuously provide service to users.
>
> Many of our users are demanding for at-most-once delivery in Flink. What do
> you think of the proposal? Any feedback is appreciated.
>
> Regards,
> Xiaogang Shi
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by zhijiang <wa...@aliyun.com.INVALID>.
Thanks for launching this topic xiaogang!

I also heard of this requirement from users before and I agree it could bring benefits for some scenarios.
As we know, the fault tolerance is one of the biggest challenges in stream architecuture, because it is difficult to change if the initial system design is not fully considering it.

Flink already provides two basic failover strategies: 
Restart-all for pipelined mode which is assumed as light-weight if checkpoint could be done quickly to make small states restore during restarting.
Region-based for blocking mode which only needs to restart the taks within a region. 
In coming release-1.9, we made much efforts for FLIP-1 and parttition management for only restarting the failed tasks if the consumed partitiosn are still available in ideal condition.

If we want to further provide more ways for fault tolerance like at-most-once, we need to measure/tradeoff the efforts with benefits. So it might be better to give a detail design and measure how much efforts to be paid.
I have the similar concerns as Piotr and from my previous experience of failover improvment in alibaba, it involves in many big changes and touches many components. We ever made big efforts to adjust the network behavior
for this issue and still seems not very clean. Because atm if one task fails, the corresponding consumer/producer sides would also fail via network communication and releases the partition/gate completely.

Best,
Zhijiang
------------------------------------------------------------------
From:Zhu Zhu <re...@gmail.com>
Send Time:2019年6月11日(星期二) 17:36
To:dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] Allow at-most-once delivery in case of failures

Thanks Xiaogang for initiating the discussion. I think it is a very good
proposal.
We also received this requirements for Flink from Alibaba internal and
external customers.
In these cases, users are less concerned of the data consistency, but have
higher demands for low latency.

Here are a couple of things to consider:
1. "at-most-once"? or no guarantee?
   "at-most-once" semantics seems not to be necessary. Data loss and
duplication are accepted as long as the inconsistency is under certain
threshold.
   Data duplications still happens when failed task get recovered
individually. Extra de-dupe efforts are needed for "at-most-once".
2. Inconsistency measurement
   Although users are less concerned of the data consistency, too much data
inconsistency is not accepted as well.
   A measurement for data inconsistency is needed for monitoring and
alerting.
3. Auto recovery
  An auto recovery mechanism is needed to recover the job to a normal state
if the inconsistency goes beyond acceptable values.


Overall I think this individual failover mechanism would be very helpful in
some cases.
In Alibaba Blink, a best effort individual failover strategy is also added
for this purpose to support customers.






Zili Chen <wa...@gmail.com> 于2019年6月11日周二 下午4:54写道:

> Hi Xiaogang,
>
> It is an interesting topic.
>
> Notice that there is some effort to build a mature mllib of flink these
> days, it could be also possible for some ml cases trade off correctness for
> timeliness or throughput. Excatly-once delivery excatly makes flink stand
> out but an at-most-once option would adapt flink to more scenarios.
>
> Best,
> tison.
>
>
> SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
>
> > Flink offers a fault-tolerance mechanism to guarantee at-least-once and
> > exactly-once message delivery in case of failures. The mechanism works
> well
> > in practice and makes Flink stand out among stream processing systems.
> >
> > But the guarantee on at-least-once and exactly-once delivery does not
> come
> > without price. It typically requires to restart multiple tasks and fall
> > back to the place where the last checkpoint is taken. (Fined-grained
> > recovery can help alleviate the cost, but it still needs certain efforts
> to
> > recover jobs.)
> >
> > In some senarios, users perfer quick recovery and will trade correctness
> > off. For example, in some online recommendation systems, timeliness is
> far
> > more important than consistency. In such cases, we can restart only those
> > failed tasks individually, and do not need to perform any rollback.
> Though
> > some messages delivered to failed tasks may be lost, other tasks can
> > continuously provide service to users.
> >
> > Many of our users are demanding for at-most-once delivery in Flink. What
> do
> > you think of the proposal? Any feedback is appreciated.
> >
> > Regards,
> > Xiaogang Shi
> >
>


Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Zhu Zhu <re...@gmail.com>.
Thanks Xiaogang for initiating the discussion. I think it is a very good
proposal.
We also received this requirements for Flink from Alibaba internal and
external customers.
In these cases, users are less concerned of the data consistency, but have
higher demands for low latency.

Here are a couple of things to consider:
1. "at-most-once"? or no guarantee?
   "at-most-once" semantics seems not to be necessary. Data loss and
duplication are accepted as long as the inconsistency is under certain
threshold.
   Data duplications still happens when failed task get recovered
individually. Extra de-dupe efforts are needed for "at-most-once".
2. Inconsistency measurement
   Although users are less concerned of the data consistency, too much data
inconsistency is not accepted as well.
   A measurement for data inconsistency is needed for monitoring and
alerting.
3. Auto recovery
  An auto recovery mechanism is needed to recover the job to a normal state
if the inconsistency goes beyond acceptable values.


Overall I think this individual failover mechanism would be very helpful in
some cases.
In Alibaba Blink, a best effort individual failover strategy is also added
for this purpose to support customers.






Zili Chen <wa...@gmail.com> 于2019年6月11日周二 下午4:54写道:

> Hi Xiaogang,
>
> It is an interesting topic.
>
> Notice that there is some effort to build a mature mllib of flink these
> days, it could be also possible for some ml cases trade off correctness for
> timeliness or throughput. Excatly-once delivery excatly makes flink stand
> out but an at-most-once option would adapt flink to more scenarios.
>
> Best,
> tison.
>
>
> SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
>
> > Flink offers a fault-tolerance mechanism to guarantee at-least-once and
> > exactly-once message delivery in case of failures. The mechanism works
> well
> > in practice and makes Flink stand out among stream processing systems.
> >
> > But the guarantee on at-least-once and exactly-once delivery does not
> come
> > without price. It typically requires to restart multiple tasks and fall
> > back to the place where the last checkpoint is taken. (Fined-grained
> > recovery can help alleviate the cost, but it still needs certain efforts
> to
> > recover jobs.)
> >
> > In some senarios, users perfer quick recovery and will trade correctness
> > off. For example, in some online recommendation systems, timeliness is
> far
> > more important than consistency. In such cases, we can restart only those
> > failed tasks individually, and do not need to perform any rollback.
> Though
> > some messages delivered to failed tasks may be lost, other tasks can
> > continuously provide service to users.
> >
> > Many of our users are demanding for at-most-once delivery in Flink. What
> do
> > you think of the proposal? Any feedback is appreciated.
> >
> > Regards,
> > Xiaogang Shi
> >
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by vino yang <ya...@gmail.com>.
+1 from my side to support this feature in Flink.

Best,
Vino

Biao Liu <mm...@gmail.com> 于2019年6月11日周二 下午6:14写道:

> Hi Piotrek,
> I agree with you that there are strained resources of community to support
> such a feature. I was planing to start a similar discussion after 1.9
> released. Anyway we don't have enough time to support this feature now, but
> I think a discussion is fine.
> It's very interesting of your checkpoint semantic question. I think it is
> worth to support however it might not be a small modification.
>
> There is also a big gap need to discuss. Currently the network error
> handling is tightly coupled with task failover strategy. There is a typical
> scenario, if a TM is crashed, all the tasks of TMs connected with the
> failed TM would fail automatically. In our internal implementation, this is
> the biggest part to support Best-effort failover strategy.
>
>
> Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
>
> > Hi Xiaogang,
> >
> > It sounds interesting and definitely a useful feature, however the
> > questions for me would be how useful, how much effort would it require
> and
> > is it worth it? We simply can not do all things at once, and currently
> > people that could review/drive/mentor this effort are pretty much
> strained
> > :( For me one would have to investigate answers to those questions and
> > prioritise it compared to other ongoing efforts, before I could vote +1
> for
> > this.
> >
> > Couple of things to consider:
> > - would it be only a job manager/failure region recovery feature?
> > - would it require changes in CheckpointBarrierHandler,
> > CheckpointCoordinator classes?
> > - with `at-most-once` semantic theoretically speaking we could just drop
> > the current `CheckpointBarrier` handling/injecting code and avoid all of
> > the checkpoint alignment issues - we could just checkpoint all of the
> tasks
> > independently of one another. However maybe that could be a follow up
> > optimisation step?
> >
> > Piotrek
> >
> > > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> > >
> > > Hi Xiaogang,
> > >
> > > It is an interesting topic.
> > >
> > > Notice that there is some effort to build a mature mllib of flink these
> > > days, it could be also possible for some ml cases trade off correctness
> > for
> > > timeliness or throughput. Excatly-once delivery excatly makes flink
> stand
> > > out but an at-most-once option would adapt flink to more scenarios.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > >
> > >> Flink offers a fault-tolerance mechanism to guarantee at-least-once
> and
> > >> exactly-once message delivery in case of failures. The mechanism works
> > well
> > >> in practice and makes Flink stand out among stream processing systems.
> > >>
> > >> But the guarantee on at-least-once and exactly-once delivery does not
> > come
> > >> without price. It typically requires to restart multiple tasks and
> fall
> > >> back to the place where the last checkpoint is taken. (Fined-grained
> > >> recovery can help alleviate the cost, but it still needs certain
> > efforts to
> > >> recover jobs.)
> > >>
> > >> In some senarios, users perfer quick recovery and will trade
> correctness
> > >> off. For example, in some online recommendation systems, timeliness is
> > far
> > >> more important than consistency. In such cases, we can restart only
> > those
> > >> failed tasks individually, and do not need to perform any rollback.
> > Though
> > >> some messages delivered to failed tasks may be lost, other tasks can
> > >> continuously provide service to users.
> > >>
> > >> Many of our users are demanding for at-most-once delivery in Flink.
> > What do
> > >> you think of the proposal? Any feedback is appreciated.
> > >>
> > >> Regards,
> > >> Xiaogang Shi
> > >>
> >
> >
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Biao Liu <mm...@gmail.com>.
Hi Piotrek,
I agree with you that there are strained resources of community to support
such a feature. I was planing to start a similar discussion after 1.9
released. Anyway we don't have enough time to support this feature now, but
I think a discussion is fine.
It's very interesting of your checkpoint semantic question. I think it is
worth to support however it might not be a small modification.

There is also a big gap need to discuss. Currently the network error
handling is tightly coupled with task failover strategy. There is a typical
scenario, if a TM is crashed, all the tasks of TMs connected with the
failed TM would fail automatically. In our internal implementation, this is
the biggest part to support Best-effort failover strategy.


Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:

> Hi Xiaogang,
>
> It sounds interesting and definitely a useful feature, however the
> questions for me would be how useful, how much effort would it require and
> is it worth it? We simply can not do all things at once, and currently
> people that could review/drive/mentor this effort are pretty much strained
> :( For me one would have to investigate answers to those questions and
> prioritise it compared to other ongoing efforts, before I could vote +1 for
> this.
>
> Couple of things to consider:
> - would it be only a job manager/failure region recovery feature?
> - would it require changes in CheckpointBarrierHandler,
> CheckpointCoordinator classes?
> - with `at-most-once` semantic theoretically speaking we could just drop
> the current `CheckpointBarrier` handling/injecting code and avoid all of
> the checkpoint alignment issues - we could just checkpoint all of the tasks
> independently of one another. However maybe that could be a follow up
> optimisation step?
>
> Piotrek
>
> > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> >
> > Hi Xiaogang,
> >
> > It is an interesting topic.
> >
> > Notice that there is some effort to build a mature mllib of flink these
> > days, it could be also possible for some ml cases trade off correctness
> for
> > timeliness or throughput. Excatly-once delivery excatly makes flink stand
> > out but an at-most-once option would adapt flink to more scenarios.
> >
> > Best,
> > tison.
> >
> >
> > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> >
> >> Flink offers a fault-tolerance mechanism to guarantee at-least-once and
> >> exactly-once message delivery in case of failures. The mechanism works
> well
> >> in practice and makes Flink stand out among stream processing systems.
> >>
> >> But the guarantee on at-least-once and exactly-once delivery does not
> come
> >> without price. It typically requires to restart multiple tasks and fall
> >> back to the place where the last checkpoint is taken. (Fined-grained
> >> recovery can help alleviate the cost, but it still needs certain
> efforts to
> >> recover jobs.)
> >>
> >> In some senarios, users perfer quick recovery and will trade correctness
> >> off. For example, in some online recommendation systems, timeliness is
> far
> >> more important than consistency. In such cases, we can restart only
> those
> >> failed tasks individually, and do not need to perform any rollback.
> Though
> >> some messages delivered to failed tasks may be lost, other tasks can
> >> continuously provide service to users.
> >>
> >> Many of our users are demanding for at-most-once delivery in Flink.
> What do
> >> you think of the proposal? Any feedback is appreciated.
> >>
> >> Regards,
> >> Xiaogang Shi
> >>
>
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Zhu Zhu <re...@gmail.com>.
Hi All,

I think using plugins, as Stephan suggested, would be the best way to serve
different requirements for difference scenarios, even after they are merged
into Flink core.

As far as I know, the pluggable shuffle service is ready for use.
Failover strategy does not support plugin yet, but it's in good shape and
would not need much effort to support it.
In our experience on implementing the "best-effort" recovery, these 2
plugins should be enough:
1. a shuffle service based on current basic implementation but supports
reconnectable input/output connection, and discarding overloaded records to
not cause back pressure
2. an individual failover strategy which only restarts the failed task

Besides, "at-lest-once" individual failover can also be supported in this
way, with a pluggable shuffle service which supports caching results [1]
and the the individual failover strategy. It can be helpful for scenarios
with higher data consistency demands.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

Thanks,
Zhu Zhu



Biao Liu <mm...@gmail.com> 于2019年7月24日周三 上午10:41写道:

> Hi Stephan & Xiaogang,
>
> It's great to see this discussion active again!
>
> It makes sense to me that doing some private optimization and trial through
> plugin. I understand that the community could not satisfy every one and
> every requirement due to limited resources. The pluggable strategy is a
> good way to compromise. In that way, it might be also helpful for improving
> the pluggable strategy itself since there might be some reasonable
> requirements from the plugin.
>
> Regarding to the "at-most-once" or "best-effort" semantics, I think it
> worths going further since we heard these requirements several times.
> However I think we need more investigations of implementing based on
> pluggable shuffle service and scheduler (or some more components?). There
> might be a public discussion when we are ready. I hope it would happen
> soon.
>
>
> On Wed, Jul 24, 2019 at 9:43 AM SHI Xiaogang <sh...@gmail.com>
> wrote:
>
> > Hi Stephan,
> >
> > I agree with you that  the implementation of "at-most-once" or
> > "best-effort" recovery will benefit from pluggable shuffle service and
> > pluggable scheduler.  Actually we made some attempts in our private
> > repository and it turns out that it requires quite a lot of work to
> > implement this with exsiting network stack. We can start the work on this
> > when pluggable shuffle service and pluggable scheduler are ready.
> >
> > The suggestion of external implementation is a very good idea. That way,
> we
> > can implement both "at-most-once" and "best-effort" guarantees as
> different
> > checkpoint/failover strategies. If so, i think we should focus on the
> > components that are changed in different strategies. These components may
> > include a pluggable checkpoint barrier handler and a pluggable failover
> > strategy. We can list these components and discuss implementation details
> > then.
> >
> > What do you think, Biao Liu and Zhu Zhu?
> >
> > Regards,
> > Xiaogang
> >
> >
> > Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午1:31写道:
> >
> > > Hi all!
> > >
> > > This is an interesting discussion for sure.
> > >
> > > Concerning user requests for changes modes, I also hear the following
> > quite
> > > often:
> > >   - reduce the expensiveness of checkpoint alignment (unaligned
> > > checkpoints) to make checkpoints fast/stable under high backpressure
> > >   - more fine-grained failover while maintaining exactly-once (even if
> > > costly)
> > >
> > > Having also "at most once" to the mix is quite a long list of big
> changes
> > > to the system.
> > >
> > > My feeling is that on such a core system, the community can not push
> all
> > > these efforts at the same time, especially because they touch
> overlapping
> > > areas of the system and need the same committers involved.
> > >
> > > On the other hand, the pluggable shuffle service and pluggable
> scheduler
> > > could make it possible to have an external implementation of that.
> > >   - of a network stack that supports "reconnects" of failed tasks with
> > > continuing tasks
> > >   - a scheduling strategy that restarts tasks individually even in
> > > pipelined regions
> > >
> > > I think contributors/committers could implements this separate from the
> > > Flink core. The feature would be trial-run it through the community
> > > packages. If it gains a lot of traction, the community could decide to
> > put
> > > in the effort to merge this into the core.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <sh...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > It definitely requires a massive effort to allow at-most-once
> delivery
> > in
> > > > Flink. But as the feature is urgently demanded by many Flink users, i
> > > think
> > > > every effort we made is worthy. Actually, the inability to support
> > > > at-most-once delivery has become a major obstacle for Storm users to
> > turn
> > > > to Flink. It's undesirable for us to run different stream processing
> > > > systems for different scenarios.
> > > >
> > > > I agree with Zhu Zhu that the guarantee we provide is the very first
> > > thing
> > > > to be discussed. Recovering with checkpoints will lead to duplicated
> > > > records, thus break the guarantee on at-most-once delivery.
> > > >
> > > > A method to achieve at-most-once guarantee is to completely disable
> > > > checkpointing and let sources only read those records posted after
> they
> > > > start. The method requires sources to allow the configuration to read
> > > > latest records, which luckily is supported by many message queues
> > > including
> > > > Kafka. As Flink relies sources' ability to rollback to achieve
> > exact-only
> > > > and at-least-once delivery, i think it's acceptable for Flink to rely
> > > > sources' ability to read latest records to achieve at-most once
> > delivery.
> > > > This method does not require any modification to existing
> checkpointing
> > > > mechanism. Besides, as there is no need to restoring from
> checkpoints,
> > > > failed tasks can recover themselves at the fastest speed.
> > > >
> > > > Concerning the implementation efforts, i think we can benefit from
> some
> > > > ongoing work including shuffle services and fine-grained recovery.
> For
> > > > example, currently the exceptions in network connections will lead to
> > > > failures of downstream and upstream tasks. To achieve at-most-once
> > > > delivery, we should decouple intermediate results from tasks,
> reporting
> > > the
> > > > exceptions of intermediate results to job master and letting the
> > failover
> > > > strategy to determine the actions taken. Some work is already done in
> > the
> > > > efforts to achieve fine-grained recovery, which can be extended to
> > allow
> > > > at-most-once delivery in Flink.
> > > >
> > > > But before starting the discussion on implementation details, as said
> > at
> > > > prior, we need to determine the guarantee we provide in the scenarios
> > > where
> > > > timely recovery is needed.
> > > > * What do you think of the at-most-once guarantee achieved by the
> > > proposed
> > > > method?
> > > > * Do we need checkpointing to reduce the amount of lost data?
> > > > * Do we need deduplication to guarantee at-most-once delivery or just
> > > > provide best-effort delivery?
> > > >
> > > > Regards,
> > > > Xiaogang Shi
> > > >
> > > >
> > > > Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
> > > >
> > > > > Hi Xiaogang,
> > > > >
> > > > > It sounds interesting and definitely a useful feature, however the
> > > > > questions for me would be how useful, how much effort would it
> > require
> > > > and
> > > > > is it worth it? We simply can not do all things at once, and
> > currently
> > > > > people that could review/drive/mentor this effort are pretty much
> > > > strained
> > > > > :( For me one would have to investigate answers to those questions
> > and
> > > > > prioritise it compared to other ongoing efforts, before I could
> vote
> > +1
> > > > for
> > > > > this.
> > > > >
> > > > > Couple of things to consider:
> > > > > - would it be only a job manager/failure region recovery feature?
> > > > > - would it require changes in CheckpointBarrierHandler,
> > > > > CheckpointCoordinator classes?
> > > > > - with `at-most-once` semantic theoretically speaking we could just
> > > drop
> > > > > the current `CheckpointBarrier` handling/injecting code and avoid
> all
> > > of
> > > > > the checkpoint alignment issues - we could just checkpoint all of
> the
> > > > tasks
> > > > > independently of one another. However maybe that could be a follow
> up
> > > > > optimisation step?
> > > > >
> > > > > Piotrek
> > > > >
> > > > > > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com>
> wrote:
> > > > > >
> > > > > > Hi Xiaogang,
> > > > > >
> > > > > > It is an interesting topic.
> > > > > >
> > > > > > Notice that there is some effort to build a mature mllib of flink
> > > these
> > > > > > days, it could be also possible for some ml cases trade off
> > > correctness
> > > > > for
> > > > > > timeliness or throughput. Excatly-once delivery excatly makes
> flink
> > > > stand
> > > > > > out but an at-most-once option would adapt flink to more
> scenarios.
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > > > > >
> > > > > >> Flink offers a fault-tolerance mechanism to guarantee
> > at-least-once
> > > > and
> > > > > >> exactly-once message delivery in case of failures. The mechanism
> > > works
> > > > > well
> > > > > >> in practice and makes Flink stand out among stream processing
> > > systems.
> > > > > >>
> > > > > >> But the guarantee on at-least-once and exactly-once delivery
> does
> > > not
> > > > > come
> > > > > >> without price. It typically requires to restart multiple tasks
> and
> > > > fall
> > > > > >> back to the place where the last checkpoint is taken.
> > (Fined-grained
> > > > > >> recovery can help alleviate the cost, but it still needs certain
> > > > > efforts to
> > > > > >> recover jobs.)
> > > > > >>
> > > > > >> In some senarios, users perfer quick recovery and will trade
> > > > correctness
> > > > > >> off. For example, in some online recommendation systems,
> > timeliness
> > > is
> > > > > far
> > > > > >> more important than consistency. In such cases, we can restart
> > only
> > > > > those
> > > > > >> failed tasks individually, and do not need to perform any
> > rollback.
> > > > > Though
> > > > > >> some messages delivered to failed tasks may be lost, other tasks
> > can
> > > > > >> continuously provide service to users.
> > > > > >>
> > > > > >> Many of our users are demanding for at-most-once delivery in
> > Flink.
> > > > > What do
> > > > > >> you think of the proposal? Any feedback is appreciated.
> > > > > >>
> > > > > >> Regards,
> > > > > >> Xiaogang Shi
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Biao Liu <mm...@gmail.com>.
Hi Stephan & Xiaogang,

It's great to see this discussion active again!

It makes sense to me that doing some private optimization and trial through
plugin. I understand that the community could not satisfy every one and
every requirement due to limited resources. The pluggable strategy is a
good way to compromise. In that way, it might be also helpful for improving
the pluggable strategy itself since there might be some reasonable
requirements from the plugin.

Regarding to the "at-most-once" or "best-effort" semantics, I think it
worths going further since we heard these requirements several times.
However I think we need more investigations of implementing based on
pluggable shuffle service and scheduler (or some more components?). There
might be a public discussion when we are ready. I hope it would happen soon.


On Wed, Jul 24, 2019 at 9:43 AM SHI Xiaogang <sh...@gmail.com> wrote:

> Hi Stephan,
>
> I agree with you that  the implementation of "at-most-once" or
> "best-effort" recovery will benefit from pluggable shuffle service and
> pluggable scheduler.  Actually we made some attempts in our private
> repository and it turns out that it requires quite a lot of work to
> implement this with exsiting network stack. We can start the work on this
> when pluggable shuffle service and pluggable scheduler are ready.
>
> The suggestion of external implementation is a very good idea. That way, we
> can implement both "at-most-once" and "best-effort" guarantees as different
> checkpoint/failover strategies. If so, i think we should focus on the
> components that are changed in different strategies. These components may
> include a pluggable checkpoint barrier handler and a pluggable failover
> strategy. We can list these components and discuss implementation details
> then.
>
> What do you think, Biao Liu and Zhu Zhu?
>
> Regards,
> Xiaogang
>
>
> Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午1:31写道:
>
> > Hi all!
> >
> > This is an interesting discussion for sure.
> >
> > Concerning user requests for changes modes, I also hear the following
> quite
> > often:
> >   - reduce the expensiveness of checkpoint alignment (unaligned
> > checkpoints) to make checkpoints fast/stable under high backpressure
> >   - more fine-grained failover while maintaining exactly-once (even if
> > costly)
> >
> > Having also "at most once" to the mix is quite a long list of big changes
> > to the system.
> >
> > My feeling is that on such a core system, the community can not push all
> > these efforts at the same time, especially because they touch overlapping
> > areas of the system and need the same committers involved.
> >
> > On the other hand, the pluggable shuffle service and pluggable scheduler
> > could make it possible to have an external implementation of that.
> >   - of a network stack that supports "reconnects" of failed tasks with
> > continuing tasks
> >   - a scheduling strategy that restarts tasks individually even in
> > pipelined regions
> >
> > I think contributors/committers could implements this separate from the
> > Flink core. The feature would be trial-run it through the community
> > packages. If it gains a lot of traction, the community could decide to
> put
> > in the effort to merge this into the core.
> >
> > Best,
> > Stephan
> >
> >
> > On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <sh...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > It definitely requires a massive effort to allow at-most-once delivery
> in
> > > Flink. But as the feature is urgently demanded by many Flink users, i
> > think
> > > every effort we made is worthy. Actually, the inability to support
> > > at-most-once delivery has become a major obstacle for Storm users to
> turn
> > > to Flink. It's undesirable for us to run different stream processing
> > > systems for different scenarios.
> > >
> > > I agree with Zhu Zhu that the guarantee we provide is the very first
> > thing
> > > to be discussed. Recovering with checkpoints will lead to duplicated
> > > records, thus break the guarantee on at-most-once delivery.
> > >
> > > A method to achieve at-most-once guarantee is to completely disable
> > > checkpointing and let sources only read those records posted after they
> > > start. The method requires sources to allow the configuration to read
> > > latest records, which luckily is supported by many message queues
> > including
> > > Kafka. As Flink relies sources' ability to rollback to achieve
> exact-only
> > > and at-least-once delivery, i think it's acceptable for Flink to rely
> > > sources' ability to read latest records to achieve at-most once
> delivery.
> > > This method does not require any modification to existing checkpointing
> > > mechanism. Besides, as there is no need to restoring from checkpoints,
> > > failed tasks can recover themselves at the fastest speed.
> > >
> > > Concerning the implementation efforts, i think we can benefit from some
> > > ongoing work including shuffle services and fine-grained recovery. For
> > > example, currently the exceptions in network connections will lead to
> > > failures of downstream and upstream tasks. To achieve at-most-once
> > > delivery, we should decouple intermediate results from tasks, reporting
> > the
> > > exceptions of intermediate results to job master and letting the
> failover
> > > strategy to determine the actions taken. Some work is already done in
> the
> > > efforts to achieve fine-grained recovery, which can be extended to
> allow
> > > at-most-once delivery in Flink.
> > >
> > > But before starting the discussion on implementation details, as said
> at
> > > prior, we need to determine the guarantee we provide in the scenarios
> > where
> > > timely recovery is needed.
> > > * What do you think of the at-most-once guarantee achieved by the
> > proposed
> > > method?
> > > * Do we need checkpointing to reduce the amount of lost data?
> > > * Do we need deduplication to guarantee at-most-once delivery or just
> > > provide best-effort delivery?
> > >
> > > Regards,
> > > Xiaogang Shi
> > >
> > >
> > > Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
> > >
> > > > Hi Xiaogang,
> > > >
> > > > It sounds interesting and definitely a useful feature, however the
> > > > questions for me would be how useful, how much effort would it
> require
> > > and
> > > > is it worth it? We simply can not do all things at once, and
> currently
> > > > people that could review/drive/mentor this effort are pretty much
> > > strained
> > > > :( For me one would have to investigate answers to those questions
> and
> > > > prioritise it compared to other ongoing efforts, before I could vote
> +1
> > > for
> > > > this.
> > > >
> > > > Couple of things to consider:
> > > > - would it be only a job manager/failure region recovery feature?
> > > > - would it require changes in CheckpointBarrierHandler,
> > > > CheckpointCoordinator classes?
> > > > - with `at-most-once` semantic theoretically speaking we could just
> > drop
> > > > the current `CheckpointBarrier` handling/injecting code and avoid all
> > of
> > > > the checkpoint alignment issues - we could just checkpoint all of the
> > > tasks
> > > > independently of one another. However maybe that could be a follow up
> > > > optimisation step?
> > > >
> > > > Piotrek
> > > >
> > > > > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> > > > >
> > > > > Hi Xiaogang,
> > > > >
> > > > > It is an interesting topic.
> > > > >
> > > > > Notice that there is some effort to build a mature mllib of flink
> > these
> > > > > days, it could be also possible for some ml cases trade off
> > correctness
> > > > for
> > > > > timeliness or throughput. Excatly-once delivery excatly makes flink
> > > stand
> > > > > out but an at-most-once option would adapt flink to more scenarios.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > > > >
> > > > >> Flink offers a fault-tolerance mechanism to guarantee
> at-least-once
> > > and
> > > > >> exactly-once message delivery in case of failures. The mechanism
> > works
> > > > well
> > > > >> in practice and makes Flink stand out among stream processing
> > systems.
> > > > >>
> > > > >> But the guarantee on at-least-once and exactly-once delivery does
> > not
> > > > come
> > > > >> without price. It typically requires to restart multiple tasks and
> > > fall
> > > > >> back to the place where the last checkpoint is taken.
> (Fined-grained
> > > > >> recovery can help alleviate the cost, but it still needs certain
> > > > efforts to
> > > > >> recover jobs.)
> > > > >>
> > > > >> In some senarios, users perfer quick recovery and will trade
> > > correctness
> > > > >> off. For example, in some online recommendation systems,
> timeliness
> > is
> > > > far
> > > > >> more important than consistency. In such cases, we can restart
> only
> > > > those
> > > > >> failed tasks individually, and do not need to perform any
> rollback.
> > > > Though
> > > > >> some messages delivered to failed tasks may be lost, other tasks
> can
> > > > >> continuously provide service to users.
> > > > >>
> > > > >> Many of our users are demanding for at-most-once delivery in
> Flink.
> > > > What do
> > > > >> you think of the proposal? Any feedback is appreciated.
> > > > >>
> > > > >> Regards,
> > > > >> Xiaogang Shi
> > > > >>
> > > >
> > > >
> > >
> >
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi Stephan,

I agree with you that  the implementation of "at-most-once" or
"best-effort" recovery will benefit from pluggable shuffle service and
pluggable scheduler.  Actually we made some attempts in our private
repository and it turns out that it requires quite a lot of work to
implement this with exsiting network stack. We can start the work on this
when pluggable shuffle service and pluggable scheduler are ready.

The suggestion of external implementation is a very good idea. That way, we
can implement both "at-most-once" and "best-effort" guarantees as different
checkpoint/failover strategies. If so, i think we should focus on the
components that are changed in different strategies. These components may
include a pluggable checkpoint barrier handler and a pluggable failover
strategy. We can list these components and discuss implementation details
then.

What do you think, Biao Liu and Zhu Zhu?

Regards,
Xiaogang


Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午1:31写道:

> Hi all!
>
> This is an interesting discussion for sure.
>
> Concerning user requests for changes modes, I also hear the following quite
> often:
>   - reduce the expensiveness of checkpoint alignment (unaligned
> checkpoints) to make checkpoints fast/stable under high backpressure
>   - more fine-grained failover while maintaining exactly-once (even if
> costly)
>
> Having also "at most once" to the mix is quite a long list of big changes
> to the system.
>
> My feeling is that on such a core system, the community can not push all
> these efforts at the same time, especially because they touch overlapping
> areas of the system and need the same committers involved.
>
> On the other hand, the pluggable shuffle service and pluggable scheduler
> could make it possible to have an external implementation of that.
>   - of a network stack that supports "reconnects" of failed tasks with
> continuing tasks
>   - a scheduling strategy that restarts tasks individually even in
> pipelined regions
>
> I think contributors/committers could implements this separate from the
> Flink core. The feature would be trial-run it through the community
> packages. If it gains a lot of traction, the community could decide to put
> in the effort to merge this into the core.
>
> Best,
> Stephan
>
>
> On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <sh...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > It definitely requires a massive effort to allow at-most-once delivery in
> > Flink. But as the feature is urgently demanded by many Flink users, i
> think
> > every effort we made is worthy. Actually, the inability to support
> > at-most-once delivery has become a major obstacle for Storm users to turn
> > to Flink. It's undesirable for us to run different stream processing
> > systems for different scenarios.
> >
> > I agree with Zhu Zhu that the guarantee we provide is the very first
> thing
> > to be discussed. Recovering with checkpoints will lead to duplicated
> > records, thus break the guarantee on at-most-once delivery.
> >
> > A method to achieve at-most-once guarantee is to completely disable
> > checkpointing and let sources only read those records posted after they
> > start. The method requires sources to allow the configuration to read
> > latest records, which luckily is supported by many message queues
> including
> > Kafka. As Flink relies sources' ability to rollback to achieve exact-only
> > and at-least-once delivery, i think it's acceptable for Flink to rely
> > sources' ability to read latest records to achieve at-most once delivery.
> > This method does not require any modification to existing checkpointing
> > mechanism. Besides, as there is no need to restoring from checkpoints,
> > failed tasks can recover themselves at the fastest speed.
> >
> > Concerning the implementation efforts, i think we can benefit from some
> > ongoing work including shuffle services and fine-grained recovery. For
> > example, currently the exceptions in network connections will lead to
> > failures of downstream and upstream tasks. To achieve at-most-once
> > delivery, we should decouple intermediate results from tasks, reporting
> the
> > exceptions of intermediate results to job master and letting the failover
> > strategy to determine the actions taken. Some work is already done in the
> > efforts to achieve fine-grained recovery, which can be extended to allow
> > at-most-once delivery in Flink.
> >
> > But before starting the discussion on implementation details, as said at
> > prior, we need to determine the guarantee we provide in the scenarios
> where
> > timely recovery is needed.
> > * What do you think of the at-most-once guarantee achieved by the
> proposed
> > method?
> > * Do we need checkpointing to reduce the amount of lost data?
> > * Do we need deduplication to guarantee at-most-once delivery or just
> > provide best-effort delivery?
> >
> > Regards,
> > Xiaogang Shi
> >
> >
> > Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
> >
> > > Hi Xiaogang,
> > >
> > > It sounds interesting and definitely a useful feature, however the
> > > questions for me would be how useful, how much effort would it require
> > and
> > > is it worth it? We simply can not do all things at once, and currently
> > > people that could review/drive/mentor this effort are pretty much
> > strained
> > > :( For me one would have to investigate answers to those questions and
> > > prioritise it compared to other ongoing efforts, before I could vote +1
> > for
> > > this.
> > >
> > > Couple of things to consider:
> > > - would it be only a job manager/failure region recovery feature?
> > > - would it require changes in CheckpointBarrierHandler,
> > > CheckpointCoordinator classes?
> > > - with `at-most-once` semantic theoretically speaking we could just
> drop
> > > the current `CheckpointBarrier` handling/injecting code and avoid all
> of
> > > the checkpoint alignment issues - we could just checkpoint all of the
> > tasks
> > > independently of one another. However maybe that could be a follow up
> > > optimisation step?
> > >
> > > Piotrek
> > >
> > > > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> > > >
> > > > Hi Xiaogang,
> > > >
> > > > It is an interesting topic.
> > > >
> > > > Notice that there is some effort to build a mature mllib of flink
> these
> > > > days, it could be also possible for some ml cases trade off
> correctness
> > > for
> > > > timeliness or throughput. Excatly-once delivery excatly makes flink
> > stand
> > > > out but an at-most-once option would adapt flink to more scenarios.
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > > >
> > > >> Flink offers a fault-tolerance mechanism to guarantee at-least-once
> > and
> > > >> exactly-once message delivery in case of failures. The mechanism
> works
> > > well
> > > >> in practice and makes Flink stand out among stream processing
> systems.
> > > >>
> > > >> But the guarantee on at-least-once and exactly-once delivery does
> not
> > > come
> > > >> without price. It typically requires to restart multiple tasks and
> > fall
> > > >> back to the place where the last checkpoint is taken. (Fined-grained
> > > >> recovery can help alleviate the cost, but it still needs certain
> > > efforts to
> > > >> recover jobs.)
> > > >>
> > > >> In some senarios, users perfer quick recovery and will trade
> > correctness
> > > >> off. For example, in some online recommendation systems, timeliness
> is
> > > far
> > > >> more important than consistency. In such cases, we can restart only
> > > those
> > > >> failed tasks individually, and do not need to perform any rollback.
> > > Though
> > > >> some messages delivered to failed tasks may be lost, other tasks can
> > > >> continuously provide service to users.
> > > >>
> > > >> Many of our users are demanding for at-most-once delivery in Flink.
> > > What do
> > > >> you think of the proposal? Any feedback is appreciated.
> > > >>
> > > >> Regards,
> > > >> Xiaogang Shi
> > > >>
> > >
> > >
> >
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Stephan Ewen <se...@apache.org>.
Hi all!

This is an interesting discussion for sure.

Concerning user requests for changes modes, I also hear the following quite
often:
  - reduce the expensiveness of checkpoint alignment (unaligned
checkpoints) to make checkpoints fast/stable under high backpressure
  - more fine-grained failover while maintaining exactly-once (even if
costly)

Having also "at most once" to the mix is quite a long list of big changes
to the system.

My feeling is that on such a core system, the community can not push all
these efforts at the same time, especially because they touch overlapping
areas of the system and need the same committers involved.

On the other hand, the pluggable shuffle service and pluggable scheduler
could make it possible to have an external implementation of that.
  - of a network stack that supports "reconnects" of failed tasks with
continuing tasks
  - a scheduling strategy that restarts tasks individually even in
pipelined regions

I think contributors/committers could implements this separate from the
Flink core. The feature would be trial-run it through the community
packages. If it gains a lot of traction, the community could decide to put
in the effort to merge this into the core.

Best,
Stephan


On Tue, Jun 11, 2019 at 2:10 PM SHI Xiaogang <sh...@gmail.com> wrote:

> Hi All,
>
> It definitely requires a massive effort to allow at-most-once delivery in
> Flink. But as the feature is urgently demanded by many Flink users, i think
> every effort we made is worthy. Actually, the inability to support
> at-most-once delivery has become a major obstacle for Storm users to turn
> to Flink. It's undesirable for us to run different stream processing
> systems for different scenarios.
>
> I agree with Zhu Zhu that the guarantee we provide is the very first thing
> to be discussed. Recovering with checkpoints will lead to duplicated
> records, thus break the guarantee on at-most-once delivery.
>
> A method to achieve at-most-once guarantee is to completely disable
> checkpointing and let sources only read those records posted after they
> start. The method requires sources to allow the configuration to read
> latest records, which luckily is supported by many message queues including
> Kafka. As Flink relies sources' ability to rollback to achieve exact-only
> and at-least-once delivery, i think it's acceptable for Flink to rely
> sources' ability to read latest records to achieve at-most once delivery.
> This method does not require any modification to existing checkpointing
> mechanism. Besides, as there is no need to restoring from checkpoints,
> failed tasks can recover themselves at the fastest speed.
>
> Concerning the implementation efforts, i think we can benefit from some
> ongoing work including shuffle services and fine-grained recovery. For
> example, currently the exceptions in network connections will lead to
> failures of downstream and upstream tasks. To achieve at-most-once
> delivery, we should decouple intermediate results from tasks, reporting the
> exceptions of intermediate results to job master and letting the failover
> strategy to determine the actions taken. Some work is already done in the
> efforts to achieve fine-grained recovery, which can be extended to allow
> at-most-once delivery in Flink.
>
> But before starting the discussion on implementation details, as said at
> prior, we need to determine the guarantee we provide in the scenarios where
> timely recovery is needed.
> * What do you think of the at-most-once guarantee achieved by the proposed
> method?
> * Do we need checkpointing to reduce the amount of lost data?
> * Do we need deduplication to guarantee at-most-once delivery or just
> provide best-effort delivery?
>
> Regards,
> Xiaogang Shi
>
>
> Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:
>
> > Hi Xiaogang,
> >
> > It sounds interesting and definitely a useful feature, however the
> > questions for me would be how useful, how much effort would it require
> and
> > is it worth it? We simply can not do all things at once, and currently
> > people that could review/drive/mentor this effort are pretty much
> strained
> > :( For me one would have to investigate answers to those questions and
> > prioritise it compared to other ongoing efforts, before I could vote +1
> for
> > this.
> >
> > Couple of things to consider:
> > - would it be only a job manager/failure region recovery feature?
> > - would it require changes in CheckpointBarrierHandler,
> > CheckpointCoordinator classes?
> > - with `at-most-once` semantic theoretically speaking we could just drop
> > the current `CheckpointBarrier` handling/injecting code and avoid all of
> > the checkpoint alignment issues - we could just checkpoint all of the
> tasks
> > independently of one another. However maybe that could be a follow up
> > optimisation step?
> >
> > Piotrek
> >
> > > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> > >
> > > Hi Xiaogang,
> > >
> > > It is an interesting topic.
> > >
> > > Notice that there is some effort to build a mature mllib of flink these
> > > days, it could be also possible for some ml cases trade off correctness
> > for
> > > timeliness or throughput. Excatly-once delivery excatly makes flink
> stand
> > > out but an at-most-once option would adapt flink to more scenarios.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> > >
> > >> Flink offers a fault-tolerance mechanism to guarantee at-least-once
> and
> > >> exactly-once message delivery in case of failures. The mechanism works
> > well
> > >> in practice and makes Flink stand out among stream processing systems.
> > >>
> > >> But the guarantee on at-least-once and exactly-once delivery does not
> > come
> > >> without price. It typically requires to restart multiple tasks and
> fall
> > >> back to the place where the last checkpoint is taken. (Fined-grained
> > >> recovery can help alleviate the cost, but it still needs certain
> > efforts to
> > >> recover jobs.)
> > >>
> > >> In some senarios, users perfer quick recovery and will trade
> correctness
> > >> off. For example, in some online recommendation systems, timeliness is
> > far
> > >> more important than consistency. In such cases, we can restart only
> > those
> > >> failed tasks individually, and do not need to perform any rollback.
> > Though
> > >> some messages delivered to failed tasks may be lost, other tasks can
> > >> continuously provide service to users.
> > >>
> > >> Many of our users are demanding for at-most-once delivery in Flink.
> > What do
> > >> you think of the proposal? Any feedback is appreciated.
> > >>
> > >> Regards,
> > >> Xiaogang Shi
> > >>
> >
> >
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi All,

It definitely requires a massive effort to allow at-most-once delivery in
Flink. But as the feature is urgently demanded by many Flink users, i think
every effort we made is worthy. Actually, the inability to support
at-most-once delivery has become a major obstacle for Storm users to turn
to Flink. It's undesirable for us to run different stream processing
systems for different scenarios.

I agree with Zhu Zhu that the guarantee we provide is the very first thing
to be discussed. Recovering with checkpoints will lead to duplicated
records, thus break the guarantee on at-most-once delivery.

A method to achieve at-most-once guarantee is to completely disable
checkpointing and let sources only read those records posted after they
start. The method requires sources to allow the configuration to read
latest records, which luckily is supported by many message queues including
Kafka. As Flink relies sources' ability to rollback to achieve exact-only
and at-least-once delivery, i think it's acceptable for Flink to rely
sources' ability to read latest records to achieve at-most once delivery.
This method does not require any modification to existing checkpointing
mechanism. Besides, as there is no need to restoring from checkpoints,
failed tasks can recover themselves at the fastest speed.

Concerning the implementation efforts, i think we can benefit from some
ongoing work including shuffle services and fine-grained recovery. For
example, currently the exceptions in network connections will lead to
failures of downstream and upstream tasks. To achieve at-most-once
delivery, we should decouple intermediate results from tasks, reporting the
exceptions of intermediate results to job master and letting the failover
strategy to determine the actions taken. Some work is already done in the
efforts to achieve fine-grained recovery, which can be extended to allow
at-most-once delivery in Flink.

But before starting the discussion on implementation details, as said at
prior, we need to determine the guarantee we provide in the scenarios where
timely recovery is needed.
* What do you think of the at-most-once guarantee achieved by the proposed
method?
* Do we need checkpointing to reduce the amount of lost data?
* Do we need deduplication to guarantee at-most-once delivery or just
provide best-effort delivery?

Regards,
Xiaogang Shi


Piotr Nowojski <pi...@ververica.com> 于2019年6月11日周二 下午5:31写道:

> Hi Xiaogang,
>
> It sounds interesting and definitely a useful feature, however the
> questions for me would be how useful, how much effort would it require and
> is it worth it? We simply can not do all things at once, and currently
> people that could review/drive/mentor this effort are pretty much strained
> :( For me one would have to investigate answers to those questions and
> prioritise it compared to other ongoing efforts, before I could vote +1 for
> this.
>
> Couple of things to consider:
> - would it be only a job manager/failure region recovery feature?
> - would it require changes in CheckpointBarrierHandler,
> CheckpointCoordinator classes?
> - with `at-most-once` semantic theoretically speaking we could just drop
> the current `CheckpointBarrier` handling/injecting code and avoid all of
> the checkpoint alignment issues - we could just checkpoint all of the tasks
> independently of one another. However maybe that could be a follow up
> optimisation step?
>
> Piotrek
>
> > On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> >
> > Hi Xiaogang,
> >
> > It is an interesting topic.
> >
> > Notice that there is some effort to build a mature mllib of flink these
> > days, it could be also possible for some ml cases trade off correctness
> for
> > timeliness or throughput. Excatly-once delivery excatly makes flink stand
> > out but an at-most-once option would adapt flink to more scenarios.
> >
> > Best,
> > tison.
> >
> >
> > SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> >
> >> Flink offers a fault-tolerance mechanism to guarantee at-least-once and
> >> exactly-once message delivery in case of failures. The mechanism works
> well
> >> in practice and makes Flink stand out among stream processing systems.
> >>
> >> But the guarantee on at-least-once and exactly-once delivery does not
> come
> >> without price. It typically requires to restart multiple tasks and fall
> >> back to the place where the last checkpoint is taken. (Fined-grained
> >> recovery can help alleviate the cost, but it still needs certain
> efforts to
> >> recover jobs.)
> >>
> >> In some senarios, users perfer quick recovery and will trade correctness
> >> off. For example, in some online recommendation systems, timeliness is
> far
> >> more important than consistency. In such cases, we can restart only
> those
> >> failed tasks individually, and do not need to perform any rollback.
> Though
> >> some messages delivered to failed tasks may be lost, other tasks can
> >> continuously provide service to users.
> >>
> >> Many of our users are demanding for at-most-once delivery in Flink.
> What do
> >> you think of the proposal? Any feedback is appreciated.
> >>
> >> Regards,
> >> Xiaogang Shi
> >>
>
>

Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi Xiaogang,

It sounds interesting and definitely a useful feature, however the questions for me would be how useful, how much effort would it require and is it worth it? We simply can not do all things at once, and currently people that could review/drive/mentor this effort are pretty much strained :( For me one would have to investigate answers to those questions and prioritise it compared to other ongoing efforts, before I could vote +1 for this.

Couple of things to consider:
- would it be only a job manager/failure region recovery feature?
- would it require changes in CheckpointBarrierHandler, CheckpointCoordinator classes?
- with `at-most-once` semantic theoretically speaking we could just drop the current `CheckpointBarrier` handling/injecting code and avoid all of the checkpoint alignment issues - we could just checkpoint all of the tasks independently of one another. However maybe that could be a follow up optimisation step?

Piotrek

> On 11 Jun 2019, at 10:53, Zili Chen <wa...@gmail.com> wrote:
> 
> Hi Xiaogang,
> 
> It is an interesting topic.
> 
> Notice that there is some effort to build a mature mllib of flink these
> days, it could be also possible for some ml cases trade off correctness for
> timeliness or throughput. Excatly-once delivery excatly makes flink stand
> out but an at-most-once option would adapt flink to more scenarios.
> 
> Best,
> tison.
> 
> 
> SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:
> 
>> Flink offers a fault-tolerance mechanism to guarantee at-least-once and
>> exactly-once message delivery in case of failures. The mechanism works well
>> in practice and makes Flink stand out among stream processing systems.
>> 
>> But the guarantee on at-least-once and exactly-once delivery does not come
>> without price. It typically requires to restart multiple tasks and fall
>> back to the place where the last checkpoint is taken. (Fined-grained
>> recovery can help alleviate the cost, but it still needs certain efforts to
>> recover jobs.)
>> 
>> In some senarios, users perfer quick recovery and will trade correctness
>> off. For example, in some online recommendation systems, timeliness is far
>> more important than consistency. In such cases, we can restart only those
>> failed tasks individually, and do not need to perform any rollback. Though
>> some messages delivered to failed tasks may be lost, other tasks can
>> continuously provide service to users.
>> 
>> Many of our users are demanding for at-most-once delivery in Flink. What do
>> you think of the proposal? Any feedback is appreciated.
>> 
>> Regards,
>> Xiaogang Shi
>> 


Re: [DISCUSS] Allow at-most-once delivery in case of failures

Posted by Zili Chen <wa...@gmail.com>.
Hi Xiaogang,

It is an interesting topic.

Notice that there is some effort to build a mature mllib of flink these
days, it could be also possible for some ml cases trade off correctness for
timeliness or throughput. Excatly-once delivery excatly makes flink stand
out but an at-most-once option would adapt flink to more scenarios.

Best,
tison.


SHI Xiaogang <sh...@gmail.com> 于2019年6月11日周二 下午4:33写道:

> Flink offers a fault-tolerance mechanism to guarantee at-least-once and
> exactly-once message delivery in case of failures. The mechanism works well
> in practice and makes Flink stand out among stream processing systems.
>
> But the guarantee on at-least-once and exactly-once delivery does not come
> without price. It typically requires to restart multiple tasks and fall
> back to the place where the last checkpoint is taken. (Fined-grained
> recovery can help alleviate the cost, but it still needs certain efforts to
> recover jobs.)
>
> In some senarios, users perfer quick recovery and will trade correctness
> off. For example, in some online recommendation systems, timeliness is far
> more important than consistency. In such cases, we can restart only those
> failed tasks individually, and do not need to perform any rollback. Though
> some messages delivered to failed tasks may be lost, other tasks can
> continuously provide service to users.
>
> Many of our users are demanding for at-most-once delivery in Flink. What do
> you think of the proposal? Any feedback is appreciated.
>
> Regards,
> Xiaogang Shi
>