You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shahid Chohan <ch...@stripe.com> on 2021/09/01 03:04:48 UTC

Unrecoverable apps due to timeouts on transaction state initialization

Today I started seeing the following exception across all of the exactly-once kafka sink apps I have deployed

org.apache.kafka.common.errors.TimeoutException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

The apps are all on Flink v1.10.2

I tried the following workarounds sequentially for a single app but I still continued to get the same exception
- changing the sink uid and restoring with allowing non-restored-state
- changing the kafka producer id and restoring with allowing non-restored-state
- changing the output kafka topic to a new one and restoring with allowing non-restored-state
- deploying from scratch (no previous checkpoint/savepoint)
- doubling the timeout for state initialization from 60s to 120s

My mental model is that we have completely disassociated the flink app from any pending transactions on the kafka side (by changing the uid, producer id, and output topic) and so it should be able to recover from scratch. The kafka clusters are otherwise healthy and accepting writes for non-exactly-once flink apps and all other kafka producers.

On the kafka side, we have the following configs set.

transaction.max.timeout.ms=3600000
transaction.remove.expired.transaction.cleanup.interval.ms=86400000

I'm considering changing the cleanup to something shorter so that if there are hanging transactions on the kafka side then maybe they can get garbage collected sooner. Or I might just wait it out and accept the downtime.

But otherwise, I am out of ideas and unsure how to proceed. Any help would be much appreciated.

RE: Unrecoverable apps due to timeouts on transaction state initialization

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Chohan,



Which Kafka client version are you using? ... considering that this started today, did you recently change the Kafka client version?



Giving a little more context (exception call stack/more log) might help finding out what is going on ... 😊



Regards



Thias



-----Original Message-----
From: Shahid Chohan <ch...@stripe.com>
Sent: Mittwoch, 1. September 2021 05:05
To: user@flink.apache.org
Subject: Unrecoverable apps due to timeouts on transaction state initialization



Today I started seeing the following exception across all of the exactly-once kafka sink apps I have deployed



org.apache.kafka.common.errors.TimeoutException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.



The apps are all on Flink v1.10.2



I tried the following workarounds sequentially for a single app but I still continued to get the same exception

- changing the sink uid and restoring with allowing non-restored-state

- changing the kafka producer id and restoring with allowing non-restored-state

- changing the output kafka topic to a new one and restoring with allowing non-restored-state

- deploying from scratch (no previous checkpoint/savepoint)

- doubling the timeout for state initialization from 60s to 120s



My mental model is that we have completely disassociated the flink app from any pending transactions on the kafka side (by changing the uid, producer id, and output topic) and so it should be able to recover from scratch. The kafka clusters are otherwise healthy and accepting writes for non-exactly-once flink apps and all other kafka producers.



On the kafka side, we have the following configs set.



transaction.max.timeout.ms=3600000

transaction.remove.expired.transaction.cleanup.interval.ms=86400000



I'm considering changing the cleanup to something shorter so that if there are hanging transactions on the kafka side then maybe they can get garbage collected sooner. Or I might just wait it out and accept the downtime.



But otherwise, I am out of ideas and unsure how to proceed. Any help would be much appreciated.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.