You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/03/15 09:03:46 UTC

[incubator-seatunnel] branch dev updated: [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 336f59049 [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)
336f59049 is described below

commit 336f5904985249f28abce13d89786ebff2a39f91
Author: hailin0 <wa...@apache.org>
AuthorDate: Wed Mar 15 17:03:37 2023 +0800

    [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)
    
    * Using enum define format options
    * Rename canal-json to canal_json
    * Add docs
---
 docs/en/connector-v2/formats/canal-json.md         | 12 ++--
 .../formats/cdc-compatible-debezium-json.md        | 67 ++++++++++++++++++++++
 docs/en/connector-v2/source/MySQL-CDC.md           |  5 ++
 docs/en/connector-v2/source/SqlServer-CDC.md       |  5 ++
 .../connectors/cdc/base/option/SourceOptions.java  |  1 +
 .../source/source/SqlServerIncrementalSource.java  |  9 +++
 .../connectors/seatunnel/kafka/config/Config.java  | 17 +-----
 .../seatunnel/kafka/config/MessageFormat.java      | 25 ++++++++
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 45 ++++++++-------
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  7 +--
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     | 10 +++-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 18 +++---
 .../seatunnel/kafka/source/KafkaSource.java        | 16 ++----
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  3 +-
 .../resources/kafkasource_canal_cdc_to_pgsql.conf  |  2 +-
 .../test/resources/kafkasource_canal_to_kafka.conf |  4 +-
 .../format/json/canal/CanalJsonFormatFactory.java  |  2 +-
 .../format/json/canal/CanalJsonFormatOptions.java  |  2 +-
 18 files changed, 180 insertions(+), 70 deletions(-)

diff --git a/docs/en/connector-v2/formats/canal-json.md b/docs/en/connector-v2/formats/canal-json.md
index c0eac3252..ca762316b 100644
--- a/docs/en/connector-v2/formats/canal-json.md
+++ b/docs/en/connector-v2/formats/canal-json.md
@@ -17,10 +17,10 @@ Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel
 
 |             option             | default | required |                                                                                                Description                                                                                                 |
 |--------------------------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| format                         | (none)  | yes      | Specify what format to use, here should be 'canal-json'.                                                                                                                                                   |
-| canal-json.ignore-parse-errors | false   | no       | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.                                                                                                       |
-| canal-json.database.include    | (none)  | no       | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
-| canal-json.table.include       | (none)  | no       | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern.       |
+| format                         | (none)  | yes      | Specify what format to use, here should be 'canal_json'.                                                                                                                                                   |
+| canal_json.ignore-parse-errors | false   | no       | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.                                                                                                       |
+| canal_json.database.include    | (none)  | no       | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
+| canal_json.table.include       | (none)  | no       | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern.       |
 
 # How to use Canal format
 
@@ -95,7 +95,7 @@ source {
            weight = "string"
       }
     },
-    format = canal-json
+    format = canal_json
   }
 
 }
@@ -107,7 +107,7 @@ sink {
   Kafka {
     bootstrap.servers = "localhost:9092"
     topic = "consume-binlog"
-    format = canal-json
+    format = canal_json
   }
 }
 ```
diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
new file mode 100644
index 000000000..8a433cd15
--- /dev/null
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -0,0 +1,67 @@
+# CDC compatible debezium-json
+
+Seatunnel supports to interpret cdc record as Debezium-JSON messages publish to mq(kafka) system.
+
+This is useful in many cases to leverage this feature, such as compatible with the debezium ecosystem.
+
+# How to use
+
+## MySQL-CDC output to Kafka
+
+```bash
+env {
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 15000
+}
+
+source {
+  MySQL-CDC {
+    result_table_name = "table1"
+
+    hostname = localhost
+    base-url="jdbc:mysql://localhost:3306/test"
+    "startup.mode"=INITIAL
+    catalog {
+        factory=MySQL
+    }
+    table-names=[
+        "database1.t1",
+        "database1.t2",
+        "database2.t1"
+    ]
+
+    # compatible_debezium_json options
+    format = compatible_debezium_json
+    debezium = {
+        # include schema into kafka message
+        key.converter.schemas.enable = false
+        value.converter.schemas.enable = false
+        # include ddl
+        include.schema.changes = true
+        # topic prefix
+        database.server.name =  "mysql_cdc_1"
+    }
+    # compatible_debezium_json fixed schema
+    schema = {
+        fields = {
+            topic = string
+            key = string
+            value = string
+        }
+    }
+  }
+}
+
+sink {
+  Kafka {
+    source_table_name = "table1"
+
+    bootstrap.servers = "localhost:9092"
+
+    # compatible_debezium_json options
+    format = compatible_debezium_json
+  }
+}
+```
+
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md
index 926fdc43e..071bbbe0b 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -44,6 +44,7 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL
 | chunk-key.even-distribution.factor.upper-bound | Double   | No       | 1000          |
 | chunk-key.even-distribution.factor.lower-bound | Double   | No       | 0.05          |
 | debezium.*                                     | config   | No       | -             |
+| format                                         | Enum     | No       | DEFAULT       |
 | common-options                                 |          | no       | -             |
 
 ### username [String]
@@ -156,6 +157,10 @@ Pass-through Debezium's properties to Debezium Embedded Engine which is used to
 See more about
 the [Debezium's MySQL Connector properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties)
 
+### format [Enum]
+
+Optional output format for MySQL CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".
+
 #### example
 
 ```conf
diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md
index a07a23827..deb4f8574 100644
--- a/docs/en/connector-v2/source/SqlServer-CDC.md
+++ b/docs/en/connector-v2/source/SqlServer-CDC.md
@@ -44,6 +44,7 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
 | chunk-key.even-distribution.factor.upper-bound | Double   | No       | 1000          |
 | chunk-key.even-distribution.factor.lower-bound | Double   | No       | 0.05          |
 | debezium.*                                     | config   | No       | -             |
+| format                                         | Enum     | No       | DEFAULT       |
 | common-options                                 |          | no       | -             |
 
 ### hostname [String]
@@ -150,6 +151,10 @@ Pass-through Debezium's properties to Debezium Embedded Engine which is used to
 See more about
 the [Debezium's SqlServer Connector properties](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#sqlserver-connector-properties)
 
+### format [Enum]
+
+Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".
+
 #### example
 
 ```conf
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index b7090e6fd..5a59b3734 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -121,6 +121,7 @@ public class SourceOptions {
 
     public static OptionRule.Builder getBaseRule() {
         return OptionRule.builder()
+                .optional(FORMAT)
                 .optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
                 .optional(INCREMENTAL_PARALLELISM)
                 .optional(STARTUP_MODE, STOP_MODE)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index 14c4c21dd..43c8c06c0 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
+import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
 import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
 import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
@@ -67,6 +69,13 @@ public class SqlServerIncrementalSource<T> extends IncrementalSource<T, JdbcSour
     @Override
     public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
             ReadonlyConfig config) {
+        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+                config.get(JdbcSourceOptions.FORMAT))) {
+            return (DebeziumDeserializationSchema<T>)
+                    new DebeziumJsonDeserializeSchema(
+                            config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
+        }
+
         SqlServerSourceConfig sqlServerSourceConfig =
                 (SqlServerSourceConfig) this.configFactory.create(0);
         TableId tableId =
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 19a2a67c3..60ce08038 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -29,16 +28,6 @@ public class Config {
     public static final String CONNECTOR_IDENTITY = "Kafka";
     public static final String REPLICATION_FACTOR = "replication.factor";
 
-    /** The default data format is JSON */
-    public static final String DEFAULT_FORMAT = "json";
-
-    public static final String TEXT_FORMAT = "text";
-
-    public static final String CANAL_FORMAT = "canal-json";
-
-    public static final String COMPATIBLE_DEBEZIUM_JSON =
-            CompatibleDebeziumJsonSerializationSchema.IDENTIFIER;
-
     /** The default field delimiter is “,” */
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
@@ -102,10 +91,10 @@ public class Config {
                     .withDescription(
                             "The structure of the data, including field names and field types.");
 
-    public static final Option<String> FORMAT =
+    public static final Option<MessageFormat> FORMAT =
             Options.key("format")
-                    .stringType()
-                    .noDefaultValue()
+                    .enumType(MessageFormat.class)
+                    .defaultValue(MessageFormat.JSON)
                     .withDescription(
                             "Data format. The default format is json. Optional text format. The default field separator is \", \". "
                                     + "If you customize the delimiter, add the \"field_delimiter\" option.");
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
new file mode 100644
index 000000000..65b5cc276
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum MessageFormat {
+    JSON,
+    TEXT,
+    CANAL_JSON,
+    COMPATIBLE_DEBEZIUM_JSON
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index ccf4879d4..06005de00 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -22,8 +22,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
 import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
@@ -41,11 +42,6 @@ import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMPATIBLE_DEBEZIUM_JSON;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
-
 @RequiredArgsConstructor
 public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
     private final Function<SeaTunnelRow, String> topicExtractor;
@@ -67,9 +63,9 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
     }
 
     public static DefaultSeaTunnelRowSerializer create(
-            String topic, SeaTunnelRowType rowType, String format, String delimiter) {
+            String topic, SeaTunnelRowType rowType, MessageFormat format, String delimiter) {
         return new DefaultSeaTunnelRowSerializer(
-                topicExtractor(topic, rowType),
+                topicExtractor(topic, rowType, format),
                 partitionExtractor(null),
                 timestampExtractor(),
                 keyExtractor(null, rowType, format, delimiter),
@@ -81,10 +77,10 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
             String topic,
             Integer partition,
             SeaTunnelRowType rowType,
-            String format,
+            MessageFormat format,
             String delimiter) {
         return new DefaultSeaTunnelRowSerializer(
-                topicExtractor(topic, rowType),
+                topicExtractor(topic, rowType, format),
                 partitionExtractor(partition),
                 timestampExtractor(),
                 keyExtractor(null, rowType, format, delimiter),
@@ -96,10 +92,10 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
             String topic,
             List<String> keyFields,
             SeaTunnelRowType rowType,
-            String format,
+            MessageFormat format,
             String delimiter) {
         return new DefaultSeaTunnelRowSerializer(
-                topicExtractor(topic, rowType),
+                topicExtractor(topic, rowType, format),
                 partitionExtractor(null),
                 timestampExtractor(),
                 keyExtractor(keyFields, rowType, format, delimiter),
@@ -120,7 +116,13 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
     }
 
     private static Function<SeaTunnelRow, String> topicExtractor(
-            String topic, SeaTunnelRowType rowType) {
+            String topic, SeaTunnelRowType rowType, MessageFormat format) {
+        if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
+            int topicFieldIndex =
+                    rowType.indexOf(CompatibleDebeziumJsonDeserializationSchema.FIELD_TOPIC);
+            return row -> row.getField(topicFieldIndex).toString();
+        }
+
         String regex = "\\$\\{(.*?)\\}";
         Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
         Matcher matcher = pattern.matcher(topic);
@@ -148,8 +150,11 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
     }
 
     private static Function<SeaTunnelRow, byte[]> keyExtractor(
-            List<String> keyFields, SeaTunnelRowType rowType, String format, String delimiter) {
-        if (Config.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
+            List<String> keyFields,
+            SeaTunnelRowType rowType,
+            MessageFormat format,
+            String delimiter) {
+        if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
             CompatibleDebeziumJsonSerializationSchema serializationSchema =
                     new CompatibleDebeziumJsonSerializationSchema(rowType, true);
             return row -> serializationSchema.serialize(row);
@@ -168,7 +173,7 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
     }
 
     private static Function<SeaTunnelRow, byte[]> valueExtractor(
-            SeaTunnelRowType rowType, String format, String delimiter) {
+            SeaTunnelRowType rowType, MessageFormat format, String delimiter) {
         SerializationSchema serializationSchema =
                 createSerializationSchema(rowType, format, delimiter, false);
         return row -> serializationSchema.serialize(row);
@@ -203,16 +208,16 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
     }
 
     private static SerializationSchema createSerializationSchema(
-            SeaTunnelRowType rowType, String format, String delimiter, boolean isKey) {
+            SeaTunnelRowType rowType, MessageFormat format, String delimiter, boolean isKey) {
         switch (format) {
-            case DEFAULT_FORMAT:
+            case JSON:
                 return new JsonSerializationSchema(rowType);
-            case TEXT_FORMAT:
+            case TEXT:
                 return TextSerializationSchema.builder()
                         .seaTunnelRowType(rowType)
                         .delimiter(delimiter)
                         .build();
-            case CANAL_FORMAT:
+            case CANAL_JSON:
                 return new CanalJsonSerializationSchema(rowType);
             case COMPATIBLE_DEBEZIUM_JSON:
                 return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 0e0ca3a4b..e09eb08e8 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -44,7 +44,6 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 /**
  * Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
@@ -61,8 +60,7 @@ public class KafkaSink
     public KafkaSink() {}
 
     public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(), BOOTSTRAP_SERVERS.key());
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
         if (!result.isSuccess()) {
             throw new KafkaConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -76,8 +74,7 @@ public class KafkaSink
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(), BOOTSTRAP_SERVERS.key());
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
         if (!result.isSuccess()) {
             throw new KafkaConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 75db7ea3b..b0cadf736 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -25,9 +25,12 @@ import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Arrays;
+
 @AutoService(Factory.class)
 public class KafkaSinkFactory implements TableSinkFactory {
     @Override
@@ -38,7 +41,12 @@ public class KafkaSinkFactory implements TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
+                .required(Config.FORMAT, Config.BOOTSTRAP_SERVERS)
+                .conditional(
+                        Config.FORMAT,
+                        Arrays.asList(
+                                MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
+                        Config.TOPIC)
                 .optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
                 .exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
                 .build();
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 1cb0b2fd2..6ed287f2d 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
 import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
@@ -44,7 +46,6 @@ import java.util.Random;
 
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
@@ -164,28 +165,29 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
             Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        String format = DEFAULT_FORMAT;
-        if (pluginConfig.hasPath(FORMAT.key())) {
-            format = pluginConfig.getString(FORMAT.key());
-        }
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
+        MessageFormat messageFormat = readonlyConfig.get(FORMAT);
         String delimiter = DEFAULT_FIELD_DELIMITER;
         if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
             delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
         }
-        String topic = pluginConfig.getString(TOPIC.key());
+        String topic = null;
+        if (pluginConfig.hasPath(TOPIC.key())) {
+            topic = pluginConfig.getString(TOPIC.key());
+        }
         if (pluginConfig.hasPath(PARTITION.key())) {
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
                     pluginConfig.getInt(PARTITION.key()),
                     seaTunnelRowType,
-                    format,
+                    messageFormat,
                     delimiter);
         } else {
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
                     getPartitionKeyFields(pluginConfig, seaTunnelRowType),
                     seaTunnelRowType,
-                    format,
+                    messageFormat,
                     delimiter);
         }
     }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index b6cfc95d6..e760dbe6e 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -39,6 +40,7 @@ import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -57,11 +59,9 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
@@ -71,7 +71,6 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHE
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 @AutoService(SeaTunnelSource.class)
@@ -227,15 +226,12 @@ public class KafkaSource
             Config schema = config.getConfig(SCHEMA.key());
             // todo: use KafkaDataTypeConvertor here?
             typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
-            String format = DEFAULT_FORMAT;
-            if (config.hasPath(FORMAT.key())) {
-                format = config.getString(FORMAT.key());
-            }
+            MessageFormat format = ReadonlyConfig.fromConfig(config).get(FORMAT);
             switch (format) {
-                case DEFAULT_FORMAT:
+                case JSON:
                     deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
                     break;
-                case TEXT_FORMAT:
+                case TEXT:
                     String delimiter = DEFAULT_FIELD_DELIMITER;
                     if (config.hasPath(FIELD_DELIMITER.key())) {
                         delimiter = config.getString(FIELD_DELIMITER.key());
@@ -246,7 +242,7 @@ public class KafkaSource
                                     .delimiter(delimiter)
                                     .build();
                     break;
-                case CANAL_FORMAT:
+                case CANAL_JSON:
                     deserializationSchema =
                             CanalJsonDeserializationSchema.builder(typeInfo)
                                     .setIgnoreParseErrors(true)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 3c73d51fc..f9fb72711 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
@@ -82,7 +83,7 @@ public class KafkaIT extends TestSuiteBase implements TestResource {
 
     private static final String KAFKA_HOST = "kafkaCluster";
 
-    private static final String DEFAULT_FORMAT = "json";
+    private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON;
 
     private static final String DEFAULT_FIELD_DELIMITER = ",";
 
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
index 57908ef4d..00bdcadbc 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
@@ -44,7 +44,7 @@ source {
            weight = "string"
       }
     },
-    format = canal-json
+    format = canal_json
   }
 }
 
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
index dfb1a7066..2b8e219aa 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
@@ -36,7 +36,7 @@ source {
     topic = "test-cdc_mds"
     result_table_name = "kafka_name"
     start_mode = earliest
-    format = canal-json
+    format = canal_json
     schema = {
       fields {
            id = "int"
@@ -52,6 +52,6 @@ sink {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test-canal-sink"
-    format = canal-json
+    format = canal_json
   }
 }
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java
index 3a227cacf..096063ba0 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java
@@ -36,7 +36,7 @@ import java.util.Map;
 public class CanalJsonFormatFactory
         implements DeserializationFormatFactory, SerializationFormatFactory {
 
-    public static final String IDENTIFIER = "canal-json";
+    public static final String IDENTIFIER = "canal_json";
 
     @Override
     public String factoryIdentifier() {
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java
index df8951da6..3af85af1d 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.format.json.JsonFormatOptions;
 
 import java.util.Map;
 
-/** Option utils for canal-json format. */
+/** Option utils for canal_json format. */
 public class CanalJsonFormatOptions {
 
     public static final Option<Boolean> IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;