You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/06/16 08:29:48 UTC
[flink] branch release-1.15 updated: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new a5c0c8776cb [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
a5c0c8776cb is described below
commit a5c0c8776cb06507f12a00c93bd92a7ac6d18375
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Thu Jun 16 16:29:40 2022 +0800
[FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
This closes #19981.
---
.../source/reader/KafkaPartitionSplitReader.java | 42 ++++++++++++++++++++--
.../reader/KafkaPartitionSplitReaderTest.java | 18 ++++++++++
2 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index d93e3e46a26..c1f85be1859 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -56,6 +56,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/** A {@link SplitReader} implementation that reads records from Kafka partitions. */
@@ -302,7 +303,9 @@ public class KafkaPartitionSplitReader
Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest);
stoppingOffsets.putAll(endOffset);
if (!partitionsStoppingAtCommitted.isEmpty()) {
- consumer.committed(partitionsStoppingAtCommitted)
+ retryOnWakeup(
+ () -> consumer.committed(partitionsStoppingAtCommitted),
+ "getting committed offset as stopping offsets")
.forEach(
(tp, offsetAndMetadata) -> {
Preconditions.checkNotNull(
@@ -320,7 +323,10 @@ public class KafkaPartitionSplitReader
List<TopicPartition> emptyPartitions = new ArrayList<>();
// If none of the partitions have any records,
for (TopicPartition tp : consumer.assignment()) {
- if (consumer.position(tp) >= getStoppingOffset(tp)) {
+ if (retryOnWakeup(
+ () -> consumer.position(tp),
+ "getting starting offset to check if split is empty")
+ >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}
@@ -343,7 +349,10 @@ public class KafkaPartitionSplitReader
if (LOG.isDebugEnabled()) {
StringJoiner splitsInfo = new StringJoiner(",");
for (KafkaPartitionSplit split : splitsChange.splits()) {
- long startingOffset = consumer.position(split.getTopicPartition());
+ long startingOffset =
+ retryOnWakeup(
+ () -> consumer.position(split.getTopicPartition()),
+ "logging starting position");
long stoppingOffset = getStoppingOffset(split.getTopicPartition());
splitsInfo.add(
String.format(
@@ -398,6 +407,33 @@ public class KafkaPartitionSplitReader
}
}
+ /**
+ * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+ *
+ * <p>This helper function handles a race condition as below:
+ *
+ * <ol>
+ * <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call
+ * <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is
+ * recorded and held by the consumer
+ * <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and
+ * interactions with consumer will throw {@link WakeupException} because of the previously
+ * held wakeup in the consumer
+ * </ol>
+ *
+ * <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
+ */
+ private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+ try {
+ return consumerCall.get();
+ } catch (WakeupException we) {
+ LOG.info(
+ "Caught WakeupException while executing Kafka consumer call for {}. Will retry the consumer call.",
+ description);
+ return consumerCall.get();
+ }
+ }
+
// ---------------- private helper class ------------------------
private static class KafkaPartitionSplitRecords
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index a34ed517e63..4a8482cab4f 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -140,6 +140,24 @@ public class KafkaPartitionSplitReaderTest {
assertNull(error.get());
}
+ @Test
+ public void testWakeupThenAssign() throws IOException {
+ KafkaPartitionSplitReader reader = createReader();
+ // Assign splits with records
+ assignSplits(reader, splitsByOwners.get(0));
+ // Run a fetch operation, and it should not block
+ reader.fetch();
+ // Wake the reader up then assign a new split. This assignment should not throw
+ // WakeupException.
+ reader.wakeUp();
+ TopicPartition tp = new TopicPartition(TOPIC1, 0);
+ assignSplits(
+ reader,
+ Collections.singletonMap(
+ KafkaPartitionSplit.toSplitId(tp),
+ new KafkaPartitionSplit(tp, KafkaPartitionSplit.EARLIEST_OFFSET)));
+ }
+
@Test
public void testNumBytesInCounter() throws Exception {
final OperatorMetricGroup operatorMetricGroup =