You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Great Info <gu...@gmail.com> on 2022/10/05 08:24:15 UTC

Flink FaultTolerant at operator level

I have flink job and the current flow looks like below

Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
record)-*Sink1-Operator* & *Sink2-Operator *

With this flow the current problem is at operator-2, the core logic runs
here is to lookup some reference status data from redis cache and enrich
the stream, this works fine if job runs well but recently I saw if job
failed at this operator or sink operators, entire jobs gets restarts and
stream gets repossessed from source, that causes different
reference status(if reference status in cache changes during this restart)
in enrichment, as per the business requirement I need to enrich with
reference status when stream received at my job.

1. Is there any way to just reprocess sink1,sink2 operators?
2. How to just resume Sink2 during some cases like Sink-1 was
successful where Sink2 failed

Re: Flink FaultTolerant at operator level

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
I had a similar use case.

What we did is that we decided that data for enrichment must be versioned,
for example our enrichment data was "refreshed" once a day and we kept old
data.
During the enrichment process we lookup data for given version based on
record's metadata.

Regards.
Krzysztof Chmielewski

śr., 5 paź 2022 o 10:25 Great Info <gu...@gmail.com> napisał(a):

> I have flink job and the current flow looks like below
>
> Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
> record)-*Sink1-Operator* & *Sink2-Operator *
>
> With this flow the current problem is at operator-2, the core logic runs
> here is to lookup some reference status data from redis cache and enrich
> the stream, this works fine if job runs well but recently I saw if job
> failed at this operator or sink operators, entire jobs gets restarts and
> stream gets repossessed from source, that causes different
> reference status(if reference status in cache changes during this restart)
> in enrichment, as per the business requirement I need to enrich with
> reference status when stream received at my job.
>
> 1. Is there any way to just reprocess sink1,sink2 operators?
> 2. How to just resume Sink2 during some cases like Sink-1 was
> successful where Sink2 failed
>
>
>

Re: Flink FaultTolerant at operator level

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
I had a similar use case.

What we did is that we decided that data for enrichment must be versioned,
for example our enrichment data was "refreshed" once a day and we kept old
data.
During the enrichment process we lookup data for given version based on
record's metadata.

Regards.
Krzysztof Chmielewski

śr., 5 paź 2022 o 10:25 Great Info <gu...@gmail.com> napisał(a):

> I have flink job and the current flow looks like below
>
> Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the
> record)-*Sink1-Operator* & *Sink2-Operator *
>
> With this flow the current problem is at operator-2, the core logic runs
> here is to lookup some reference status data from redis cache and enrich
> the stream, this works fine if job runs well but recently I saw if job
> failed at this operator or sink operators, entire jobs gets restarts and
> stream gets repossessed from source, that causes different
> reference status(if reference status in cache changes during this restart)
> in enrichment, as per the business requirement I need to enrich with
> reference status when stream received at my job.
>
> 1. Is there any way to just reprocess sink1,sink2 operators?
> 2. How to just resume Sink2 during some cases like Sink-1 was
> successful where Sink2 failed
>
>
>