You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ba...@apache.org on 2024/04/16 00:54:18 UTC
(pulsar) branch branch-3.2 updated: [fix][io] Kafka Source connector maybe stuck (#22511)
This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new df6a18241eb [fix][io] Kafka Source connector maybe stuck (#22511)
df6a18241eb is described below
commit df6a18241eb9f1fe72c5a871057d6176f91e490a
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Tue Apr 16 08:04:11 2024 +0800
[fix][io] Kafka Source connector maybe stuck (#22511)
(cherry picked from commit bbff29d8ecc2f6c7ec91e0a48085fe14c8ffd6b8)
---
.../pulsar/io/kafka/KafkaAbstractSource.java | 28 ++++++-
.../io/kafka/source/KafkaAbstractSourceTest.java | 89 ++++++++++++++++++++++
2 files changed, 116 insertions(+), 1 deletion(-)
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 782f9d5d57d..7eba7438b2b 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -63,6 +64,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
private volatile boolean running = false;
private KafkaSourceConfig kafkaSourceConfig;
private Thread runnerThread;
+ private long maxPollIntervalMs;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
@@ -126,6 +128,13 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConfig.getAutoOffsetReset());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
+ if (props.containsKey(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) {
+ maxPollIntervalMs = Long.parseLong(props.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG).toString());
+ } else {
+ maxPollIntervalMs = Long.parseLong(
+ ConsumerConfig.configDef().defaultValues().get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)
+ .toString());
+ }
try {
consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
} catch (Exception ex) {
@@ -175,7 +184,9 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
index++;
}
if (!kafkaSourceConfig.isAutoCommitEnabled()) {
- CompletableFuture.allOf(futures).get();
+ // Wait about 2/3 of the time of maxPollIntervalMs.
+ // so as to avoid waiting for the timeout to be kicked out of the consumer group.
+ CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS);
consumer.commitSync();
}
} catch (Exception e) {
@@ -253,6 +264,21 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
completableFuture.complete(null);
}
+ @Override
+ public void fail() {
+ completableFuture.completeExceptionally(
+ new RuntimeException(
+ String.format(
+ "Failed to process record with kafka topic: %s partition: %d offset: %d key: %s",
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ getKey()
+ )
+ )
+ );
+ }
+
@Override
public Schema<V> getSchema() {
return schema;
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 7675de0636e..6b4719709a1 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -21,12 +21,18 @@ package org.apache.pulsar.io.kafka.source;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.util.Collections;
+import java.util.Arrays;
import java.lang.reflect.Field;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
@@ -46,6 +52,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
@@ -218,6 +225,88 @@ public class KafkaAbstractSourceTest {
source.read();
}
+ @Test
+ public final void throwExceptionBySendFail() throws Exception {
+ KafkaAbstractSource source = new DummySource();
+
+ KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+ kafkaSourceConfig.setTopic("test-topic");
+ kafkaSourceConfig.setAutoCommitEnabled(false);
+ Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+ kafkaSourceConfigField.setAccessible(true);
+ kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
+ Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
+ defaultMaxPollIntervalMsField.setAccessible(true);
+ defaultMaxPollIntervalMsField.set(source, 300000);
+
+ Consumer consumer = mock(Consumer.class);
+ ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
+ "t-key", "t-value".getBytes(StandardCharsets.UTF_8));
+ ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap(
+ new TopicPartition("topic", 0),
+ Arrays.asList(consumerRecord)));
+ Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));
+
+ Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(source, consumer);
+ source.start();
+
+ // Mock send message fail
+ Record record = source.read();
+ record.fail();
+
+ // read again will throw RuntimeException.
+ try {
+ source.read();
+ fail("Should throw exception");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof RuntimeException);
+ assertTrue(e.getCause().getMessage().contains("Failed to process record with kafka topic"));
+ }
+ }
+
+ @Test
+ public final void throwExceptionBySendTimeOut() throws Exception {
+ KafkaAbstractSource source = new DummySource();
+
+ KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
+ kafkaSourceConfig.setTopic("test-topic");
+ kafkaSourceConfig.setAutoCommitEnabled(false);
+ Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
+ kafkaSourceConfigField.setAccessible(true);
+ kafkaSourceConfigField.set(source, kafkaSourceConfig);
+
+ Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
+ defaultMaxPollIntervalMsField.setAccessible(true);
+ defaultMaxPollIntervalMsField.set(source, 1);
+
+ Consumer consumer = mock(Consumer.class);
+ ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
+ "t-key", "t-value".getBytes(StandardCharsets.UTF_8));
+ ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap(
+ new TopicPartition("topic", 0),
+ Arrays.asList(consumerRecord)));
+ Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));
+
+ Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
+ consumerField.setAccessible(true);
+ consumerField.set(source, consumer);
+ source.start();
+
+ // Mock send message fail, just read do noting.
+ source.read();
+
+ // read again will throw TimeOutException.
+ try {
+ source.read();
+ fail("Should throw exception");
+ } catch (Exception e) {
+ assertTrue(e instanceof TimeoutException);
+ }
+ }
+
private File getFile(String name) {
ClassLoader classLoader = getClass().getClassLoader();
return new File(classLoader.getResource(name).getFile());