You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yordan Pavlov <y....@gmail.com> on 2022/11/15 14:36:06 UTC

Kafka transactions drastically limit usability of Flink savepoints

Hi,
we are using Kafka savepoints as a recovery tool and want to store
multiple ones for the past months. However as we use Kafka
transactions for our KafkaSink this puts expiration time on our
savepoints. We can use a savepoint only as old as our Kafka
transaction timeout. The problem is explained in this issue:
https://issues.apache.org/jira/browse/FLINK-16419
the relative comment being this one:
"FlinkKafkaProducer or KafkaSink do not know during recovery if they
have to recover and commit or if it has already happened. Due to that,
they are always attempting to recover and commit transactions during
startup."
I'm surprised that more people are not hitting this problem as this
makes Savepoints pretty much unusable as a recovery mechanism.

Re: Kafka transactions drastically limit usability of Flink savepoints

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Yordan,

Indeed it looks like a missing feature. Probably someone implementing the
new KafkaSink didn't realize how important this is. I've created a ticket
to work on this issue [1], but I don't know when or who could fix it.

I think a workaround might be to create a new `KafkaSink` instance that
will have a new, different operator uid, and simply drop/ignore the old
instance and its state (by using the `allowNonRestoredState` option [2]).

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-30068
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#allowing-non-restored-state


śr., 16 lis 2022 o 11:36 Yordan Pavlov <y....@gmail.com> napisał(a):

> Hi Piotr,
>
> the option you mention is applicable only for the deprecated
> KafkaProducer, is there an equivalent to the modern KafkaSink? I found
> this article comparing the behavior of the two:
>
> https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs
>
> it suggests that the default behavior of KafkaSink would be: "The
> recovery continues with an ERROR message like the following is
> logged:", however this is not what I observe, instead the job fails. I
> am attaching the relevant part of the log. This error happens upon
> trying to recover from a one month old savepoint.
>
> Regards,
> Yordan
>
> On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski <pn...@apache.org> wrote:
> >
> > Hi Yordan,
> >
> > I don't understand where the problem is, why do you think savepoints are
> unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
> enabled, the current Flink behaviour shouldn't cause any problems (except
> for maybe some logged errors).
> >
> > Best,
> > Piotrek
> >
> > wt., 15 lis 2022 o 15:36 Yordan Pavlov <y....@gmail.com>
> napisał(a):
> >>
> >> Hi,
> >> we are using Kafka savepoints as a recovery tool and want to store
> >> multiple ones for the past months. However as we use Kafka
> >> transactions for our KafkaSink this puts expiration time on our
> >> savepoints. We can use a savepoint only as old as our Kafka
> >> transaction timeout. The problem is explained in this issue:
> >> https://issues.apache.org/jira/browse/FLINK-16419
> >> the relative comment being this one:
> >> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
> >> have to recover and commit or if it has already happened. Due to that,
> >> they are always attempting to recover and commit transactions during
> >> startup."
> >> I'm surprised that more people are not hitting this problem as this
> >> makes Savepoints pretty much unusable as a recovery mechanism.
>

Re: Kafka transactions drastically limit usability of Flink savepoints

Posted by Yordan Pavlov <y....@gmail.com>.
Hi Piotr,

the option you mention is applicable only for the deprecated
KafkaProducer, is there an equivalent to the modern KafkaSink? I found
this article comparing the behavior of the two:
https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs

it suggests that the default behavior of KafkaSink would be: "The
recovery continues with an ERROR message like the following is
logged:", however this is not what I observe, instead the job fails. I
am attaching the relevant part of the log. This error happens upon
trying to recover from a one month old savepoint.

Regards,
Yordan

On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski <pn...@apache.org> wrote:
>
> Hi Yordan,
>
> I don't understand where the problem is, why do you think savepoints are unusable? If you recover with `ignoreFailuresAfterTransactionTimeout` enabled, the current Flink behaviour shouldn't cause any problems (except for maybe some logged errors).
>
> Best,
> Piotrek
>
> wt., 15 lis 2022 o 15:36 Yordan Pavlov <y....@gmail.com> napisał(a):
>>
>> Hi,
>> we are using Kafka savepoints as a recovery tool and want to store
>> multiple ones for the past months. However as we use Kafka
>> transactions for our KafkaSink this puts expiration time on our
>> savepoints. We can use a savepoint only as old as our Kafka
>> transaction timeout. The problem is explained in this issue:
>> https://issues.apache.org/jira/browse/FLINK-16419
>> the relative comment being this one:
>> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
>> have to recover and commit or if it has already happened. Due to that,
>> they are always attempting to recover and commit transactions during
>> startup."
>> I'm surprised that more people are not hitting this problem as this
>> makes Savepoints pretty much unusable as a recovery mechanism.

Re: Kafka transactions drastically limit usability of Flink savepoints

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Yordan,

I don't understand where the problem is, why do you think savepoints are
unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
enabled, the current Flink behaviour shouldn't cause any problems (except
for maybe some logged errors).

Best,
Piotrek

wt., 15 lis 2022 o 15:36 Yordan Pavlov <y....@gmail.com> napisał(a):

> Hi,
> we are using Kafka savepoints as a recovery tool and want to store
> multiple ones for the past months. However as we use Kafka
> transactions for our KafkaSink this puts expiration time on our
> savepoints. We can use a savepoint only as old as our Kafka
> transaction timeout. The problem is explained in this issue:
> https://issues.apache.org/jira/browse/FLINK-16419
> the relative comment being this one:
> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
> have to recover and commit or if it has already happened. Due to that,
> they are always attempting to recover and commit transactions during
> startup."
> I'm surprised that more people are not hitting this problem as this
> makes Savepoints pretty much unusable as a recovery mechanism.
>