You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/24 08:37:47 UTC

[incubator-seatunnel] branch dev updated: [Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3cdb7cfa4 [Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)
3cdb7cfa4 is described below

commit 3cdb7cfa4d045c6edb0eb43772613836c09e3fed
Author: lightzhao <40...@users.noreply.github.com>
AuthorDate: Fri Mar 24 16:37:39 2023 +0800

    [Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)
---
 .../seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java  | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 7afa59e3f..4d62c00c6 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
@@ -83,8 +82,6 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
             this.kafkaProducer.setTransactionalId(commitInfo.getTransactionId());
         } else {
             Properties kafkaProperties = commitInfo.getKafkaProperties();
-            kafkaProperties.setProperty(
-                    ConsumerConfig.CLIENT_ID_CONFIG, "sink-committer-" + this.hashCode());
             kafkaProperties.setProperty(
                     ProducerConfig.TRANSACTIONAL_ID_CONFIG, commitInfo.getTransactionId());
             kafkaProducer =