You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dong Lin <li...@gmail.com> on 2022/12/01 13:46:08 UTC

Re: [DISCUSS]Introduce a time-segment based restart strategy

Hi Gen,

Thanks for the suggestions!

Regarding how to implement the per-region RestartBackoffTimeStrategy as
proposed previously, I think your approach works well.

Here are more details:
- Keep the RestartBackoffTimeStrategy interface API unchanged and only
change its semantics, such that all strategies (e.g. failure rate, fixed
delay, exponential delay) are applied per region.
- Update ExecutionFailureHandler to create one
RestartBackoffTimeStrategy instance for each region.
ExecutionFailureHandler can get the region information from its
SchedulingTopology.
- ExecutionFailureHandler::getFailureHandlingResult() will use the strategy
instance for the given failedTask's region to make the failover decision.

Please see the other comment inline.

Regards,
Dong

On Fri, Nov 25, 2022 at 7:42 PM Gen Luo <lu...@gmail.com> wrote:

> Hi all,
>
> Sorry for the late jumping in.
>
> To meet Weihua's need, Dong's proposal seems pretty fine, but the
> modification it requires, I'm afraid, is not really easy.
> RestartBackoffTimeStrategy is quite a simple interface. The strategy even
> doesn't know which task is failing, not to mention the division of pipeline
> regions.
> To distinguish the failure count of each regions, it lacks too much
> information, which is not easy to acquire for the strategy.
> One approch I can figure out is to create different strategy instances to
> different regions. In this way we do not need to modify the strategy but do
> need to modify the schedulers or the ExecutionFailureHandler.
>
> On the other hand, I realize another case that the restart strategy may
> need to be aware of the types and occurrence rate of the exceptions. That
> is to avoid failing over but directly fail the job when some errors happen.
> I know that there's an annotation
> `@ThrowableAnnotation(ThrowableType.NonRecoverableError)` that can fail the
> job, but I'm afraid there can be some scenarios that can not annotate the
> exceptions, or catch and warp with an annotated exception.
>

While this is possible, do you have a concrete use-case that can not use
and catch the annotated exception? It is probably safer to only add a new
strategy (which is a public API) when we are sure we need it :)

In such cases, handling in the restart strategy can be a good choice.
> Such a strategy can even combines with other existing strategies which
> handle the failure rate rather than the cause type.
>
> Besides, given that new strategies may be necessary, and existing
> strategies may also need to enhance, maybe we should make the
> RestartBackoffTimeStrategy a plugin rather than the enumerations, or
> introduce a new custom type strategy which can load customized
> implementations.
> This can not solve the problem immediately, but makes the choice of restart
> strategy more flexiable.
> What do you think about this?
>
> Thanks.
>
> Paul Lam <pa...@gmail.com> 于 2022年11月21日周一 17:46写道:
>
> > Dong’s proposal LGTM.
> >
> > Best,
> > Paul Lam
> >
> > > 2022年11月19日 10:50,Dong Lin <li...@gmail.com> 写道:
> > >
> > > Hey Weihua,
> > >
> > > Thanks for proposing the new strategy!
> > >
> > > If I understand correctly, the main issue is that different failover
> > > regions can be restarted independently, but they share the same counter
> > > when counting the number of failures in an interval. So the number of
> > > failures for a given region is less than what users expect.
> > >
> > > Given that regions can be restarted independently, it might be more
> > usable
> > > and intuitive to count the number of failures for each region when
> > > executing the failover strategy. Thus, instead of adding a new failover
> > > strategy, how about we update the existing failure-rate strategy, and
> > > probably other existing strategies as well, to use the following
> > semantics:
> > >
> > > - For any given region in the job, its number of failures in
> > > failure-rate-interval should not exceed max-failures-per-interval.
> > > Otherwise, the job will fail without being restarted.
> > >
> > > By using this updated semantics, the keyby-connected job will have the
> > same
> > > behavior as the existing Flink when we use failure-rate strategy. For
> > > the rescale-connected
> > > job, in the case you described above, after the TM fails, each of the 3
> > > regions will increment its failure count from 0 to 1, which is still
> less
> > > than max-failures-per-interval. Thus the rescale-connected job can
> > continue
> > > to work.
> > >
> > > This alternative approach can solve the problem without increasing the
> > > complexity of the failover strategy choice. And this approach does not
> > > require us to check whether two exceptions belong to the same root
> cause.
> > > Do you think it can work?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Fri, Nov 4, 2022 at 4:46 PM Weihua Hu <hu...@gmail.com>
> wrote:
> > >
> > >> Hi, everyone
> > >>
> > >> I'd like to bring up a discussion about restart strategy. Flink
> > supports 3
> > >> kinds of restart strategy. These work very well for jobs with specific
> > >> configs, but for platform users who manage hundreds of jobs, there is
> no
> > >> common strategy to use.
> > >>
> > >> Let me explain the reason. We manage a lot of jobs, some are
> > >> keyby-connected with one region per job, some are rescale-connected
> with
> > >> many regions per job, and when using the failure rate restart
> strategy,
> > we
> > >> cannot achieve the same control with the same configuration.
> > >>
> > >> For example, if I want the job to fail when there are 3 exceptions
> > within 5
> > >> minutes, the config would look like this:
> > >>
> > >>> restart-strategy.failure-rate.max-failures-per-interval: 3
> > >>>
> > >>> restart-strategy.failure-rate.failure-rate-interval: 5 min
> > >>>
> > >> For the keyby-connected job, this config works well.
> > >>
> > >> However, for the rescale-connected job, we need to consider the number
> > of
> > >> regions and the number of slots per TaskManager. If each TM has 3
> slots,
> > >> and these 3 slots run the task of 3 regions, then when one TaskManager
> > >> crashes, it will trigger 3 regions to fail, and the job will fail
> > because
> > >> it exceeds the threshold of the restart strategy. To avoid the effect
> of
> > >> single TM crashes, I must increase the max-failures-per-interval to 9,
> > but
> > >> after the change, user task exceptions will be more tolerant than I
> > want.
> > >>
> > >>
> > >> Therefore, I want to introduce a new restart strategy based on time
> > >> periods. A continuous period of time (e.g., 5 minutes) is divided into
> > >> segments of a specific length (e.g., 1 minute). If an exception occurs
> > >> within a segment (no matter how many times), it is marked as a failed
> > >> segment. Similar to failure-rate restart strategy, the job will fail
> > when
> > >> there are 'm' failed segments in the interval of 'n' .
> > >>
> > >> In this mode, the keyby-connected and rescale-connected jobs can use
> > >> unified configurations.
> > >>
> > >> This is a user-relevant change, so if you think this is worth to do,
> > maybe
> > >> I can create a FLIP to describe it in detail.
> > >> Best,
> > >> Weihua
> > >>
> >
> >
>