You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/06/15 12:36:14 UTC
[flink] branch master updated: [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 50c19d9f534 [FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
50c19d9f534 is described below
commit 50c19d9f534f86e228f3e0937d92baf766a57165
Author: Qingsheng Ren <re...@gmail.com>
AuthorDate: Wed Jun 15 20:36:06 2022 +0800
[FLINK-27762][connector/kafka] Catch WakeupException and retry KafkaConsumer invocations in split assignment
This closes #19828.
---
.../source/reader/KafkaPartitionSplitReader.java | 42 ++++++++++++++++++++--
.../reader/KafkaPartitionSplitReaderTest.java | 18 ++++++++++
2 files changed, 57 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 90554177224..0078d78ca65 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -56,6 +56,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/** A {@link SplitReader} implementation that reads records from Kafka partitions. */
@@ -313,7 +314,9 @@ public class KafkaPartitionSplitReader
Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest);
stoppingOffsets.putAll(endOffset);
if (!partitionsStoppingAtCommitted.isEmpty()) {
- consumer.committed(partitionsStoppingAtCommitted)
+ retryOnWakeup(
+ () -> consumer.committed(partitionsStoppingAtCommitted),
+ "getting committed offset as stopping offsets")
.forEach(
(tp, offsetAndMetadata) -> {
Preconditions.checkNotNull(
@@ -331,7 +334,10 @@ public class KafkaPartitionSplitReader
List<TopicPartition> emptyPartitions = new ArrayList<>();
// If none of the partitions have any records,
for (TopicPartition tp : consumer.assignment()) {
- if (consumer.position(tp) >= getStoppingOffset(tp)) {
+ if (retryOnWakeup(
+ () -> consumer.position(tp),
+ "getting starting offset to check if split is empty")
+ >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}
@@ -354,7 +360,10 @@ public class KafkaPartitionSplitReader
if (LOG.isDebugEnabled()) {
StringJoiner splitsInfo = new StringJoiner(",");
for (KafkaPartitionSplit split : splitsChange.splits()) {
- long startingOffset = consumer.position(split.getTopicPartition());
+ long startingOffset =
+ retryOnWakeup(
+ () -> consumer.position(split.getTopicPartition()),
+ "logging starting position");
long stoppingOffset = getStoppingOffset(split.getTopicPartition());
splitsInfo.add(
String.format(
@@ -409,6 +418,33 @@ public class KafkaPartitionSplitReader
}
}
+ /**
+ * Catch {@link WakeupException} in Kafka consumer call and retry the invocation on exception.
+ *
+ * <p>This helper function handles a race condition as below:
+ *
+ * <ol>
+ * <li>Fetcher thread finishes a {@link KafkaConsumer#poll(Duration)} call
+ * <li>Task thread assigns new splits so invokes {@link #wakeUp()}, then the wakeup is
+ * recorded and held by the consumer
+ * <li>Later fetcher thread invokes {@link #handleSplitsChanges(SplitsChange)}, and
+ * interactions with consumer will throw {@link WakeupException} because of the previously
+ * held wakeup in the consumer
+ * </ol>
+ *
+ * <p>Under this case we need to catch the {@link WakeupException} and retry the operation.
+ */
+ private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
+ try {
+ return consumerCall.get();
+ } catch (WakeupException we) {
+ LOG.info(
+ "Caught WakeupException while executing Kafka consumer call for {}. Will retry the consumer call.",
+ description);
+ return consumerCall.get();
+ }
+ }
+
// ---------------- private helper class ------------------------
private static class KafkaPartitionSplitRecords
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index 6a9654adf52..c7df7945c44 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -132,6 +132,24 @@ public class KafkaPartitionSplitReaderTest {
assertThat(error.get()).isNull();
}
+ @Test
+ public void testWakeupThenAssign() throws IOException {
+ KafkaPartitionSplitReader reader = createReader();
+ // Assign splits with records
+ assignSplits(reader, splitsByOwners.get(0));
+ // Run a fetch operation, and it should not block
+ reader.fetch();
+ // Wake the reader up then assign a new split. This assignment should not throw
+ // WakeupException.
+ reader.wakeUp();
+ TopicPartition tp = new TopicPartition(TOPIC1, 0);
+ assignSplits(
+ reader,
+ Collections.singletonMap(
+ KafkaPartitionSplit.toSplitId(tp),
+ new KafkaPartitionSplit(tp, KafkaPartitionSplit.EARLIEST_OFFSET)));
+ }
+
@Test
public void testNumBytesInCounter() throws Exception {
final OperatorMetricGroup operatorMetricGroup =