You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/09/26 08:48:16 UTC

[incubator-seatunnel] branch dev updated: [Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 254223fdb [Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866)
254223fdb is described below

commit 254223fdb92dd83d72ea6439457068ca38cb3d6d
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Mon Sep 26 16:48:09 2022 +0800

    [Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866)
    
    * parameter verification
    
    * update
    
    * update
---
 .../seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java   | 10 ++++++++++
 .../connectors/seatunnel/kafka/sink/KafkaSinkWriter.java       |  3 ++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 1c37dfd77..ef92d4152 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -26,6 +29,9 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -50,6 +56,10 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVERS);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
         this.pluginConfig = pluginConfig;
     }
 
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 3f71c3085..b577067ef 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
@@ -136,7 +137,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        return new DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"), seaTunnelRowType);
+        return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
     }
 
     private KafkaSemantics getKafkaSemantics(Config pluginConfig) {