You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/09 08:36:41 UTC
[incubator-seatunnel] branch api-draft updated: Fix the kafka consumer still getting consumption data when the client is closed (#2000)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 80794cf3 Fix the kafka consumer still getting consumption data when the client is closed (#2000)
80794cf3 is described below
commit 80794cf34ee826f1964df0fff604f63913c8ba3d
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Thu Jun 9 16:36:36 2022 +0800
Fix the kafka consumer still getting consumption data when the client is closed (#2000)
---
.../connectors/seatunnel/kafka/source/KafkaSourceReader.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 797316e2..5536d1dc 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -60,6 +60,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
private final Map<TopicPartition, Long> endOffset;
// TODO support user custom type
private SeaTunnelRowTypeInfo typeInfo;
+ private volatile boolean isRunning;
KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowTypeInfo typeInfo,
SourceReader.Context context) {
@@ -74,10 +75,12 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
public void open() {
this.consumer = initConsumer(this.metadata.getBootstrapServer(), this.metadata.getConsumerGroup(),
this.metadata.getProperties(), !this.metadata.isCommitOnCheckpoint());
+ isRunning = true;
}
@Override
public void close() throws IOException {
+ isRunning = false;
if (consumer != null) {
consumer.close();
}
@@ -93,7 +96,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
StringDeserializer stringDeserializer = new StringDeserializer();
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()), false);
consumer.assign(partitions);
- while (true) {
+ while (isRunning) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
for (TopicPartition partition : partitions) {
for (ConsumerRecord<byte[], byte[]> record : records.records(partition)) {