You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/09 08:36:41 UTC

[incubator-seatunnel] branch api-draft updated: Fix the kafka consumer still getting consumption data when the client is closed (#2000)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 80794cf3 Fix the kafka consumer still getting consumption data when the client is closed (#2000)
80794cf3 is described below

commit 80794cf34ee826f1964df0fff604f63913c8ba3d
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Thu Jun 9 16:36:36 2022 +0800

    Fix the kafka consumer still getting consumption data when the client is closed (#2000)
---
 .../connectors/seatunnel/kafka/source/KafkaSourceReader.java         | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 797316e2..5536d1dc 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -60,6 +60,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     private final Map<TopicPartition, Long> endOffset;
     // TODO support user custom type
     private SeaTunnelRowTypeInfo typeInfo;
+    private volatile boolean isRunning;
 
     KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowTypeInfo typeInfo,
                       SourceReader.Context context) {
@@ -74,10 +75,12 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     public void open() {
         this.consumer = initConsumer(this.metadata.getBootstrapServer(), this.metadata.getConsumerGroup(),
                 this.metadata.getProperties(), !this.metadata.isCommitOnCheckpoint());
+        isRunning = true;
     }
 
     @Override
     public void close() throws IOException {
+        isRunning = false;
         if (consumer != null) {
             consumer.close();
         }
@@ -93,7 +96,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
         StringDeserializer stringDeserializer = new StringDeserializer();
         stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()), false);
         consumer.assign(partitions);
-        while (true) {
+        while (isRunning) {
             ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
             for (TopicPartition partition : partitions) {
                 for (ConsumerRecord<byte[], byte[]> record : records.records(partition)) {