You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/06/16 08:29:48 UTC

[flink] branch release-1.15 updated: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new a5c0c8776cb [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
a5c0c8776cb is described below

commit a5c0c8776cb06507f12a00c93bd92a7ac6d18375
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Jun 16 16:29:40 2022 +0800

    [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
    
    This closes #19981.
---
 .../source/reader/KafkaPartitionSplitReader.java   | 42 ++++++++++++++++++++--
 .../reader/KafkaPartitionSplitReaderTest.java      | 18 ++++++++++
 2 files changed, 57 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index d93e3e46a26..c1f85be1859 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -56,6 +56,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /** A {@link SplitReader} implementation that reads records from Kafka partitions. */
@@ -302,7 +303,9 @@ public class KafkaPartitionSplitReader
         Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest);
         stoppingOffsets.putAll(endOffset);
         if (!partitionsStoppingAtCommitted.isEmpty()) {
-            consumer.committed(partitionsStoppingAtCommitted)
+            retryOnWakeup(
+                            () -> consumer.committed(partitionsStoppingAtCommitted),
+                            "getting committed offset as stopping offsets")
                     .forEach(
                             (tp, offsetAndMetadata) -> {
                                 Preconditions.checkNotNull(
@@ -320,7 +323,10 @@ public class KafkaPartitionSplitReader
         List<TopicPartition> emptyPartitions = new ArrayList<>();
         // If none of the partitions have any records,
         for (TopicPartition tp : consumer.assignment()) {
-            if (consumer.position(tp) >= getStoppingOffset(tp)) {
+            if (retryOnWakeup(
+                            () -> consumer.position(tp),
+                            "getting starting offset to check if split is empty")
+                    >= getStoppingOffset(tp)) {
                 emptyPartitions.add(tp);
             }
         }
@@ -343,7 +349,10 @@ public class KafkaPartitionSplitReader
         if (LOG.isDebugEnabled()) {
             StringJoiner splitsInfo = new StringJoiner(",");
             for (KafkaPartitionSplit split : splitsChange.splits()) {
-                long startingOffset = consumer.position(split.getTopicPartition());
+                long startingOffset =
+                        retryOnWakeup(
+                                () -> consumer.position(split.getTopicPartition()),
+                                "logging starting position");
                 long stoppingOffset = getStoppingOffset(split.getTopicPartition());
                 splitsInfo.add(
                         String.format(
@@ -398,6 +407,33 @@ public class KafkaPartitionSplitReader
         }
     }
 
+    /**
+     * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+     *
+     * <p>This helper function handles a race condition as below:
+     *
+     * <ol>
+     *   <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call
+     *   <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is
+     *       recorded and held by the consumer
+     *   <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and
+     *       interactions with consumer will throw {@link WakeupException} because of the previously
+     *       held wakeup in the consumer
+     * </ol>
+     *
+     * <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
+     */
+    private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+        try {
+            return consumerCall.get();
+        } catch (WakeupException we) {
+            LOG.info(
+                    "Caught WakeupException while executing Kafka consumer call for {}. Will retry the consumer call.",
+                    description);
+            return consumerCall.get();
+        }
+    }
+
     // ---------------- private helper class ------------------------
 
     private static class KafkaPartitionSplitRecords
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index a34ed517e63..4a8482cab4f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -140,6 +140,24 @@ public class KafkaPartitionSplitReaderTest {
         assertNull(error.get());
     }
 
+    @Test
+    public void testWakeupThenAssign() throws IOException {
+        KafkaPartitionSplitReader reader = createReader();
+        // Assign splits with records
+        assignSplits(reader, splitsByOwners.get(0));
+        // Run a fetch operation, and it should not block
+        reader.fetch();
+        // Wake the reader up then assign a new split. This assignment should not throw
+        // WakeupException.
+        reader.wakeUp();
+        TopicPartition tp = new TopicPartition(TOPIC1, 0);
+        assignSplits(
+                reader,
+                Collections.singletonMap(
+                        KafkaPartitionSplit.toSplitId(tp),
+                        new KafkaPartitionSplit(tp, KafkaPartitionSplit.EARLIEST_OFFSET)));
+    }
+
     @Test
     public void testNumBytesInCounter() throws Exception {
         final OperatorMetricGroup operatorMetricGroup =