You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robin Cassan via user <us...@flink.apache.org> on 2022/07/08 14:50:50 UTC

Ignoring state's offset when restoring checkpoints

Hello all!

We have a need where, for specific recovery cases, we would need to
manually reset our Flink kafka consumer offset to a given date but have the
Flink job restore its state. As I understand, restoring from a checkpoint
necessarily sets the group's offset to the one that was in the checkpoint.
Is there a way to disable this behavior so that Flink will restore the
state from the checkpoint, but use the offset that was manually set in
kafka?

Thanks!

Robin

Re: Ignoring state's offset when restoring checkpoints

Posted by Robin Cassan via user <us...@flink.apache.org>.
Thanks a lot Alexander and Tzu-Li for your answers, this helps a lot!!

Cheers,
Robin

Le ven. 8 juil. 2022 à 17:40, Tzu-Li (Gordon) Tai <tz...@apache.org> a
écrit :

> Hi Robin,
>
> Apart from what Alexander suggested, I think you could also try the
> following first:
> Let the job use a "new" Kafka source, which you can achieve by simply
> assigning a different operator ID than before. If you previously did not
> set an ID, then previously by default it would have been a hash computed by
> Flink.
> With a new operator ID, Flink would see this as a new source operator that
> does not have previous state (i.e. there would be no partition offsets to
> restore from). All other existing operators in the job will still restore
> its previous state. With this "new" Kafka source, you can then set the
> initial offsets to start consuming from by either setting a startup date or
> specific map of partition offsets.
>
> Also, in order for the job to successfully restore, I think you would need
> to set the "--allowNonRestoredState" option when submitting the job.
> This essentially tells Flink to ignore the fact that the "old" Kafka
> source state is not being restored for the job (since there is no longer a
> matching operator to restore those offsets to).
>
> Cheers,
> Gordon
>
> On Fri, Jul 8, 2022 at 7:29 AM Alexander Fedulov <al...@ververica.com>
> wrote:
>
>> Hi Robin,
>>
>> you should be able to use the State Processor API to modify the operator
>> state (sources) and override the offsets manually there. I never tried
>> that, but I believe conceptually it should work.
>>
>> Best,
>> Alexander Fedulov
>>
>

Re: Ignoring state's offset when restoring checkpoints

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Robin,

Apart from what Alexander suggested, I think you could also try the
following first:
Let the job use a "new" Kafka source, which you can achieve by simply
assigning a different operator ID than before. If you previously did not
set an ID, then previously by default it would have been a hash computed by
Flink.
With a new operator ID, Flink would see this as a new source operator that
does not have previous state (i.e. there would be no partition offsets to
restore from). All other existing operators in the job will still restore
its previous state. With this "new" Kafka source, you can then set the
initial offsets to start consuming from by either setting a startup date or
specific map of partition offsets.

Also, in order for the job to successfully restore, I think you would need
to set the "--allowNonRestoredState" option when submitting the job.
This essentially tells Flink to ignore the fact that the "old" Kafka source
state is not being restored for the job (since there is no longer a
matching operator to restore those offsets to).

Cheers,
Gordon

On Fri, Jul 8, 2022 at 7:29 AM Alexander Fedulov <al...@ververica.com>
wrote:

> Hi Robin,
>
> you should be able to use the State Processor API to modify the operator
> state (sources) and override the offsets manually there. I never tried
> that, but I believe conceptually it should work.
>
> Best,
> Alexander Fedulov
>

Re: Ignoring state's offset when restoring checkpoints

Posted by Alexander Fedulov <al...@ververica.com>.
Hi Robin,

you should be able to use the State Processor API to modify the operator
state (sources) and override the offsets manually there. I never tried
that, but I believe conceptually it should work.

Best,
Alexander Fedulov