You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yordan Pavlov (Jira)" <ji...@apache.org> on 2022/10/20 16:19:00 UTC

[jira] [Commented] (FLINK-16419) Avoid to recommit transactions which are known committed successfully to Kafka upon recovery

    [ https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17621220#comment-17621220 ] 

Yordan Pavlov commented on FLINK-16419:
---------------------------------------

[~martijnvisser] This issue has plagued our Flink setup for quite some time now under different versions. We are struggling to use Savepoints with Kafka EOS, could you elaborate on what our options are so that Savepoints work as recovery mechanism for unlimited time into the future? I would restate my previous questions:
 # Can we force Flink not to leave open transactions by triggering 'savepoint and stop' mechanism. Meaning stop the job with a savepoint, is this a good approach.
 # If above is 'yes', can we force Flink to generate the same type of savepoint without exiting and having to then recover the state?
 # Are there any alternatives on how we can backup our progress periodically and use Kafka EOS

Any help is highly appriciated?

> Avoid to recommit transactions which are known committed successfully to Kafka upon recovery
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16419
>                 URL: https://issues.apache.org/jira/browse/FLINK-16419
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Runtime / Checkpointing
>            Reporter: Jun Qin
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer tries to recommit all pre-committed transactions which are in the snapshot, even if those transactions were successfully committed before (i.e., the call to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} returns OK). This may lead to recovery failures when recovering from a very old snapshot because the transactional IDs in that snapshot may have been expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer  - Attempting to resume transaction Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata                            - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer              - [Producer clientId=producer-1, transactionalId=Source: Custom Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task                    - Source: Custom Source -> Sink: Unnamed (1/1) (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id.
>         at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible way is to let JobManager, after successfully notifies all operators the completion of a snapshot (via {{notifyCheckpoingComplete}}), record the success, e.g., write the successful transactional IDs somewhere in the snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)