You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/03/24 08:37:47 UTC
[incubator-seatunnel] branch dev updated: [Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3cdb7cfa4 [Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)
3cdb7cfa4 is described below
commit 3cdb7cfa4d045c6edb0eb43772613836c09e3fed
Author: lightzhao <40...@users.noreply.github.com>
AuthorDate: Fri Mar 24 16:37:39 2023 +0800
[Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)
---
.../seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java | 3 ---
1 file changed, 3 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 7afa59e3f..4d62c00c6 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -83,8 +82,6 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
this.kafkaProducer.setTransactionalId(commitInfo.getTransactionId());
} else {
Properties kafkaProperties = commitInfo.getKafkaProperties();
- kafkaProperties.setProperty(
- ConsumerConfig.CLIENT_ID_CONFIG, "sink-committer-" + this.hashCode());
kafkaProperties.setProperty(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, commitInfo.getTransactionId());
kafkaProducer =