You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by re...@apache.org on 2022/09/13 02:19:56 UTC

[flink] branch release-1.16 updated: [FLINK-29153][connector/kafka] Retry KafkaConsumer#commitAsync on WakeupException in KafkaConsumerThread

This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new dbcf949f89b [FLINK-29153][connector/kafka] Retry KafkaConsumer#commitAsync on WakeupException in KafkaConsumerThread
dbcf949f89b is described below

commit dbcf949f89b5bead6c2bec4de77ca68bc8614fe6
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Tue Sep 6 17:01:45 2022 +0800

    [FLINK-29153][connector/kafka] Retry KafkaConsumer#commitAsync on WakeupException in KafkaConsumerThread
    
    KafkaConsumerThread makes a wakeup on the KafkaConsumer on offset commit to wakeup the potential blocking KafkaConsumer.poll(). However the wakeup might happen when the consumer is not polling. The wakeup will be remembered by the consumer and re-examined while committing the offset asynchronously, which leads to an unnecessary WakeupException.
---
 .../kafka/internals/KafkaConsumerThread.java         | 20 +++++++++++++++++---
 1 file changed, 17 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
index eba9f3b0387..4c4cc907944 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
@@ -223,9 +223,12 @@ public class KafkaConsumerThread<T> extends Thread {
                         // also record that a commit is already in progress
                         // the order here matters! first set the flag, then send the commit command.
                         commitInProgress = true;
-                        consumer.commitAsync(
-                                commitOffsetsAndCallback.f0,
-                                new CommitCallback(commitOffsetsAndCallback.f1));
+                        retryOnceOnWakeup(
+                                () ->
+                                        consumer.commitAsync(
+                                                commitOffsetsAndCallback.f0,
+                                                new CommitCallback(commitOffsetsAndCallback.f1)),
+                                "commitAsync");
                     }
                 }
 
@@ -503,6 +506,17 @@ public class KafkaConsumerThread<T> extends Thread {
         return new KafkaConsumer<>(kafkaProperties);
     }
 
+    private void retryOnceOnWakeup(Runnable consumerCall, String description) {
+        try {
+            consumerCall.run();
+        } catch (WakeupException we) {
+            log.info(
+                    "Caught WakeupException while executing Kafka consumer call for {}. Will retry it once.",
+                    description);
+            consumerCall.run();
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------