You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/04 16:10:46 UTC

[incubator-seatunnel] branch dev updated: [Improve]]Connector-V2\[Kafka] Set kafka consumer default group (#4271)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 82c784a3e [Improve]]Connector-V2\[Kafka] Set kafka consumer default group (#4271)
82c784a3e is described below

commit 82c784a3eff4a7bc1766ad243b107561ec47f27a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Sun Mar 5 00:10:40 2023 +0800

    [Improve]]Connector-V2\[Kafka] Set kafka consumer default group (#4271)
---
 .../apache/seatunnel/connectors/seatunnel/kafka/config/Config.java  | 2 +-
 .../seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java    | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 376e4f26f..547700220 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -71,7 +71,7 @@ public class Config {
     public static final Option<String> CONSUMER_GROUP =
             Options.key("consumer.group")
                     .stringType()
-                    .noDefaultValue()
+                    .defaultValue("SeaTunnel-Consumer-Group")
                     .withDescription(
                             "Kafka consumer group id, used to distinguish different consumer groups.");
 
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 5aeea30bd..b6cfc95d6 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -79,8 +79,6 @@ public class KafkaSource
         implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
                 SupportParallelism {
 
-    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
-
     private final ConsumerMetadata metadata = new ConsumerMetadata();
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
     private SeaTunnelRowType typeInfo;
@@ -113,6 +111,8 @@ public class KafkaSource
         this.metadata.setTopic(config.getString(TOPIC.key()));
         if (config.hasPath(PATTERN.key())) {
             this.metadata.setPattern(config.getBoolean(PATTERN.key()));
+        } else {
+            this.metadata.setPattern(PATTERN.defaultValue());
         }
         this.metadata.setBootstrapServers(config.getString(BOOTSTRAP_SERVERS.key()));
         this.metadata.setProperties(new Properties());
@@ -120,7 +120,7 @@ public class KafkaSource
         if (config.hasPath(CONSUMER_GROUP.key())) {
             this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP.key()));
         } else {
-            this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+            this.metadata.setConsumerGroup(CONSUMER_GROUP.defaultValue());
         }
 
         if (config.hasPath(COMMIT_ON_CHECKPOINT.key())) {