You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yordan Pavlov (Jira)" <ji...@apache.org> on 2022/07/07 07:58:00 UTC
[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
[ https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563616#comment-17563616 ]
Yordan Pavlov commented on FLINK-16419:
---------------------------------------
[~martijnvisser]
I am still seeing the issue under some conditions. On trying to recover from Savepoint I am seeing A LOT (like thousands) of those logs:
{code:java}
2022-07-07 07:18:39.576 [kafka-producer-network-thread | producer-test_topic_v13.output-0-1-11937] INFO o.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-test_topic_v13.output-0-1-11937, transactionalId=test_topic_v13.output-0-1-11937] ProducerId set to 581196 with epoch 97
2022-07-07 07:18:39.576 [Order address balances ERC20 -> Sink Save to Kafka ERC20 test_topic_v13.output-0 (2/2)#0] INFO o.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-test_topic_v13.output-0-1-11937, transactionalId=test_topic_v13.output-0-1-11937] ProducerId set to -1 with epoch -1{code}
Followed by:
{code:java}
ERROR org.apache.flink.connector.kafka.sink.KafkaCommitter - Transaction (KafkaCommittable{producerId=484104, epoch=0, transactionalId=test_topic_v13.output-0-1-11936}) encountered error and data has been potentially lost.
org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.base/java.lang.Thread.run(Unknown Source){code}
> How old was the checkpoint and what is your Kafka setting for transactional.id.expiration.ms
The checkpoint is 3 weeks old, it was created by a Flink 1.14 job. I haven't applied any custom value to 'transactional.id.expiration.ms' , I have modified only "transaction.timeout.ms" and have its value set to 24h.
> Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-16419
> URL: https://issues.apache.org/jira/browse/FLINK-16419
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka, Runtime / Checkpointing
> Reporter: Jun Qin
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor, usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer tries to recommit all pre-committed transactions which are in the snapshot, even if those transactions were successfully committed before (i.e., the call to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} returns OK). This may lead to recovery failures when recovering from a very old snapshot because the transactional IDs in that snapshot may have been expired and removed from Kafka. For example the following scenario:
> # Start a Flink job with FlinkKafkaProducer sink with exactly-once
> # Suspend the Flink job with a savepoint A
> # Wait for time longer than {{transactional.id.expiration.ms}} + {{transaction.remove.expired.transaction.cleanup.interval.ms}}
> # Recover the job with savepoint A.
> # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer - Attempting to resume transaction Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1, transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/1) (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
> at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
> at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
> at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as it may hide real transaction timeout errors.
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible way is to let JobManager, after successfully notifies all operators the completion of a snapshot (via {{notifyCheckpoingComplete}}), record the success, e.g., write the successful transactional IDs somewhere in the snapshot. Then those transactions need not recommit upon recovery.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)