You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/03/15 09:03:46 UTC
[incubator-seatunnel] branch dev updated: [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 336f59049 [Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)
336f59049 is described below
commit 336f5904985249f28abce13d89786ebff2a39f91
Author: hailin0 <wa...@apache.org>
AuthorDate: Wed Mar 15 17:03:37 2023 +0800
[Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)
* Using enum define format options
* Rename canal-json to canal_json
* Add docs
---
docs/en/connector-v2/formats/canal-json.md | 12 ++--
.../formats/cdc-compatible-debezium-json.md | 67 ++++++++++++++++++++++
docs/en/connector-v2/source/MySQL-CDC.md | 5 ++
docs/en/connector-v2/source/SqlServer-CDC.md | 5 ++
.../connectors/cdc/base/option/SourceOptions.java | 1 +
.../source/source/SqlServerIncrementalSource.java | 9 +++
.../connectors/seatunnel/kafka/config/Config.java | 17 +-----
.../seatunnel/kafka/config/MessageFormat.java | 25 ++++++++
.../serialize/DefaultSeaTunnelRowSerializer.java | 45 ++++++++-------
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 7 +--
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 10 +++-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 18 +++---
.../seatunnel/kafka/source/KafkaSource.java | 16 ++----
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 3 +-
.../resources/kafkasource_canal_cdc_to_pgsql.conf | 2 +-
.../test/resources/kafkasource_canal_to_kafka.conf | 4 +-
.../format/json/canal/CanalJsonFormatFactory.java | 2 +-
.../format/json/canal/CanalJsonFormatOptions.java | 2 +-
18 files changed, 180 insertions(+), 70 deletions(-)
diff --git a/docs/en/connector-v2/formats/canal-json.md b/docs/en/connector-v2/formats/canal-json.md
index c0eac3252..ca762316b 100644
--- a/docs/en/connector-v2/formats/canal-json.md
+++ b/docs/en/connector-v2/formats/canal-json.md
@@ -17,10 +17,10 @@ Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel
| option | default | required | Description |
|--------------------------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| format | (none) | yes | Specify what format to use, here should be 'canal-json'. |
-| canal-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
-| canal-json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
-| canal-json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
+| format | (none) | yes | Specify what format to use, here should be 'canal_json'. |
+| canal_json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
+| canal_json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
+| canal_json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
# How to use Canal format
@@ -95,7 +95,7 @@ source {
weight = "string"
}
},
- format = canal-json
+ format = canal_json
}
}
@@ -107,7 +107,7 @@ sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "consume-binlog"
- format = canal-json
+ format = canal_json
}
}
```
diff --git a/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
new file mode 100644
index 000000000..8a433cd15
--- /dev/null
+++ b/docs/en/connector-v2/formats/cdc-compatible-debezium-json.md
@@ -0,0 +1,67 @@
+# CDC compatible debezium-json
+
+Seatunnel supports to interpret cdc record as Debezium-JSON messages publish to mq(kafka) system.
+
+This is useful in many cases to leverage this feature, such as compatible with the debezium ecosystem.
+
+# How to use
+
+## MySQL-CDC output to Kafka
+
+```bash
+env {
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 15000
+}
+
+source {
+ MySQL-CDC {
+ result_table_name = "table1"
+
+ hostname = localhost
+ base-url="jdbc:mysql://localhost:3306/test"
+ "startup.mode"=INITIAL
+ catalog {
+ factory=MySQL
+ }
+ table-names=[
+ "database1.t1",
+ "database1.t2",
+ "database2.t1"
+ ]
+
+ # compatible_debezium_json options
+ format = compatible_debezium_json
+ debezium = {
+ # include schema into kafka message
+ key.converter.schemas.enable = false
+ value.converter.schemas.enable = false
+ # include ddl
+ include.schema.changes = true
+ # topic prefix
+ database.server.name = "mysql_cdc_1"
+ }
+ # compatible_debezium_json fixed schema
+ schema = {
+ fields = {
+ topic = string
+ key = string
+ value = string
+ }
+ }
+ }
+}
+
+sink {
+ Kafka {
+ source_table_name = "table1"
+
+ bootstrap.servers = "localhost:9092"
+
+ # compatible_debezium_json options
+ format = compatible_debezium_json
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md
index 926fdc43e..071bbbe0b 100644
--- a/docs/en/connector-v2/source/MySQL-CDC.md
+++ b/docs/en/connector-v2/source/MySQL-CDC.md
@@ -44,6 +44,7 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| debezium.* | config | No | - |
+| format | Enum | No | DEFAULT |
| common-options | | no | - |
### username [String]
@@ -156,6 +157,10 @@ Pass-through Debezium's properties to Debezium Embedded Engine which is used to
See more about
the [Debezium's MySQL Connector properties](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties)
+### format [Enum]
+
+Optional output format for MySQL CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".
+
#### example
```conf
diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md
index a07a23827..deb4f8574 100644
--- a/docs/en/connector-v2/source/SqlServer-CDC.md
+++ b/docs/en/connector-v2/source/SqlServer-CDC.md
@@ -44,6 +44,7 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| debezium.* | config | No | - |
+| format | Enum | No | DEFAULT |
| common-options | | no | - |
### hostname [String]
@@ -150,6 +151,10 @@ Pass-through Debezium's properties to Debezium Embedded Engine which is used to
See more about
the [Debezium's SqlServer Connector properties](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#sqlserver-connector-properties)
+### format [Enum]
+
+Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".
+
#### example
```conf
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index b7090e6fd..5a59b3734 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -121,6 +121,7 @@ public class SourceOptions {
public static OptionRule.Builder getBaseRule() {
return OptionRule.builder()
+ .optional(FORMAT)
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
.optional(INCREMENTAL_PARALLELISM)
.optional(STARTUP_MODE, STOP_MODE)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index 14c4c21dd..43c8c06c0 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -28,6 +28,8 @@ import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
+import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
@@ -67,6 +69,13 @@ public class SqlServerIncrementalSource<T> extends IncrementalSource<T, JdbcSour
@Override
public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
ReadonlyConfig config) {
+ if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+ config.get(JdbcSourceOptions.FORMAT))) {
+ return (DebeziumDeserializationSchema<T>)
+ new DebeziumJsonDeserializeSchema(
+ config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES));
+ }
+
SqlServerSourceConfig sqlServerSourceConfig =
(SqlServerSourceConfig) this.configFactory.create(0);
TableId tableId =
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 19a2a67c3..60ce08038 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import java.util.List;
import java.util.Map;
@@ -29,16 +28,6 @@ public class Config {
public static final String CONNECTOR_IDENTITY = "Kafka";
public static final String REPLICATION_FACTOR = "replication.factor";
- /** The default data format is JSON */
- public static final String DEFAULT_FORMAT = "json";
-
- public static final String TEXT_FORMAT = "text";
-
- public static final String CANAL_FORMAT = "canal-json";
-
- public static final String COMPATIBLE_DEBEZIUM_JSON =
- CompatibleDebeziumJsonSerializationSchema.IDENTIFIER;
-
/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";
@@ -102,10 +91,10 @@ public class Config {
.withDescription(
"The structure of the data, including field names and field types.");
- public static final Option<String> FORMAT =
+ public static final Option<MessageFormat> FORMAT =
Options.key("format")
- .stringType()
- .noDefaultValue()
+ .enumType(MessageFormat.class)
+ .defaultValue(MessageFormat.JSON)
.withDescription(
"Data format. The default format is json. Optional text format. The default field separator is \", \". "
+ "If you customize the delimiter, add the \"field_delimiter\" option.");
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
new file mode 100644
index 000000000..65b5cc276
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum MessageFormat {
+ JSON,
+ TEXT,
+ CANAL_JSON,
+ COMPATIBLE_DEBEZIUM_JSON
+}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index ccf4879d4..06005de00 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -22,8 +22,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
@@ -41,11 +42,6 @@ import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMPATIBLE_DEBEZIUM_JSON;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
-
@RequiredArgsConstructor
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
private final Function<SeaTunnelRow, String> topicExtractor;
@@ -67,9 +63,9 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
}
public static DefaultSeaTunnelRowSerializer create(
- String topic, SeaTunnelRowType rowType, String format, String delimiter) {
+ String topic, SeaTunnelRowType rowType, MessageFormat format, String delimiter) {
return new DefaultSeaTunnelRowSerializer(
- topicExtractor(topic, rowType),
+ topicExtractor(topic, rowType, format),
partitionExtractor(null),
timestampExtractor(),
keyExtractor(null, rowType, format, delimiter),
@@ -81,10 +77,10 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
String topic,
Integer partition,
SeaTunnelRowType rowType,
- String format,
+ MessageFormat format,
String delimiter) {
return new DefaultSeaTunnelRowSerializer(
- topicExtractor(topic, rowType),
+ topicExtractor(topic, rowType, format),
partitionExtractor(partition),
timestampExtractor(),
keyExtractor(null, rowType, format, delimiter),
@@ -96,10 +92,10 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
String topic,
List<String> keyFields,
SeaTunnelRowType rowType,
- String format,
+ MessageFormat format,
String delimiter) {
return new DefaultSeaTunnelRowSerializer(
- topicExtractor(topic, rowType),
+ topicExtractor(topic, rowType, format),
partitionExtractor(null),
timestampExtractor(),
keyExtractor(keyFields, rowType, format, delimiter),
@@ -120,7 +116,13 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
}
private static Function<SeaTunnelRow, String> topicExtractor(
- String topic, SeaTunnelRowType rowType) {
+ String topic, SeaTunnelRowType rowType, MessageFormat format) {
+ if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
+ int topicFieldIndex =
+ rowType.indexOf(CompatibleDebeziumJsonDeserializationSchema.FIELD_TOPIC);
+ return row -> row.getField(topicFieldIndex).toString();
+ }
+
String regex = "\\$\\{(.*?)\\}";
Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
Matcher matcher = pattern.matcher(topic);
@@ -148,8 +150,11 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
}
private static Function<SeaTunnelRow, byte[]> keyExtractor(
- List<String> keyFields, SeaTunnelRowType rowType, String format, String delimiter) {
- if (Config.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
+ List<String> keyFields,
+ SeaTunnelRowType rowType,
+ MessageFormat format,
+ String delimiter) {
+ if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)) {
CompatibleDebeziumJsonSerializationSchema serializationSchema =
new CompatibleDebeziumJsonSerializationSchema(rowType, true);
return row -> serializationSchema.serialize(row);
@@ -168,7 +173,7 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
}
private static Function<SeaTunnelRow, byte[]> valueExtractor(
- SeaTunnelRowType rowType, String format, String delimiter) {
+ SeaTunnelRowType rowType, MessageFormat format, String delimiter) {
SerializationSchema serializationSchema =
createSerializationSchema(rowType, format, delimiter, false);
return row -> serializationSchema.serialize(row);
@@ -203,16 +208,16 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
}
private static SerializationSchema createSerializationSchema(
- SeaTunnelRowType rowType, String format, String delimiter, boolean isKey) {
+ SeaTunnelRowType rowType, MessageFormat format, String delimiter, boolean isKey) {
switch (format) {
- case DEFAULT_FORMAT:
+ case JSON:
return new JsonSerializationSchema(rowType);
- case TEXT_FORMAT:
+ case TEXT:
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(delimiter)
.build();
- case CANAL_FORMAT:
+ case CANAL_JSON:
return new CanalJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 0e0ca3a4b..e09eb08e8 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -44,7 +44,6 @@ import java.util.List;
import java.util.Optional;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
/**
* Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
@@ -61,8 +60,7 @@ public class KafkaSink
public KafkaSink() {}
public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(), BOOTSTRAP_SERVERS.key());
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -76,8 +74,7 @@ public class KafkaSink
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(pluginConfig, TOPIC.key(), BOOTSTRAP_SERVERS.key());
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 75db7ea3b..b0cadf736 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -25,9 +25,12 @@ import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import com.google.auto.service.AutoService;
+import java.util.Arrays;
+
@AutoService(Factory.class)
public class KafkaSinkFactory implements TableSinkFactory {
@Override
@@ -38,7 +41,12 @@ public class KafkaSinkFactory implements TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
+ .required(Config.FORMAT, Config.BOOTSTRAP_SERVERS)
+ .conditional(
+ Config.FORMAT,
+ Arrays.asList(
+ MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
+ Config.TOPIC)
.optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 1cb0b2fd2..6ed287f2d 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -19,12 +19,14 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
@@ -44,7 +46,6 @@ import java.util.Random;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
@@ -164,28 +165,29 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- String format = DEFAULT_FORMAT;
- if (pluginConfig.hasPath(FORMAT.key())) {
- format = pluginConfig.getString(FORMAT.key());
- }
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
+ MessageFormat messageFormat = readonlyConfig.get(FORMAT);
String delimiter = DEFAULT_FIELD_DELIMITER;
if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
}
- String topic = pluginConfig.getString(TOPIC.key());
+ String topic = null;
+ if (pluginConfig.hasPath(TOPIC.key())) {
+ topic = pluginConfig.getString(TOPIC.key());
+ }
if (pluginConfig.hasPath(PARTITION.key())) {
return DefaultSeaTunnelRowSerializer.create(
topic,
pluginConfig.getInt(PARTITION.key()),
seaTunnelRowType,
- format,
+ messageFormat,
delimiter);
} else {
return DefaultSeaTunnelRowSerializer.create(
topic,
getPartitionKeyFields(pluginConfig, seaTunnelRowType),
seaTunnelRowType,
- format,
+ messageFormat,
delimiter);
}
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index b6cfc95d6..e760dbe6e 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -39,6 +40,7 @@ import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -57,11 +59,9 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
@@ -71,7 +71,6 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHE
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
-import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
@AutoService(SeaTunnelSource.class)
@@ -227,15 +226,12 @@ public class KafkaSource
Config schema = config.getConfig(SCHEMA.key());
// todo: use KafkaDataTypeConvertor here?
typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
- String format = DEFAULT_FORMAT;
- if (config.hasPath(FORMAT.key())) {
- format = config.getString(FORMAT.key());
- }
+ MessageFormat format = ReadonlyConfig.fromConfig(config).get(FORMAT);
switch (format) {
- case DEFAULT_FORMAT:
+ case JSON:
deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
break;
- case TEXT_FORMAT:
+ case TEXT:
String delimiter = DEFAULT_FIELD_DELIMITER;
if (config.hasPath(FIELD_DELIMITER.key())) {
delimiter = config.getString(FIELD_DELIMITER.key());
@@ -246,7 +242,7 @@ public class KafkaSource
.delimiter(delimiter)
.build();
break;
- case CANAL_FORMAT:
+ case CANAL_JSON:
deserializationSchema =
CanalJsonDeserializationSchema.builder(typeInfo)
.setIgnoreParseErrors(true)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 3c73d51fc..f9fb72711 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
@@ -82,7 +83,7 @@ public class KafkaIT extends TestSuiteBase implements TestResource {
private static final String KAFKA_HOST = "kafkaCluster";
- private static final String DEFAULT_FORMAT = "json";
+ private static final MessageFormat DEFAULT_FORMAT = MessageFormat.JSON;
private static final String DEFAULT_FIELD_DELIMITER = ",";
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
index 57908ef4d..00bdcadbc 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
@@ -44,7 +44,7 @@ source {
weight = "string"
}
},
- format = canal-json
+ format = canal_json
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
index dfb1a7066..2b8e219aa 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
@@ -36,7 +36,7 @@ source {
topic = "test-cdc_mds"
result_table_name = "kafka_name"
start_mode = earliest
- format = canal-json
+ format = canal_json
schema = {
fields {
id = "int"
@@ -52,6 +52,6 @@ sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test-canal-sink"
- format = canal-json
+ format = canal_json
}
}
\ No newline at end of file
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java
index 3a227cacf..096063ba0 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatFactory.java
@@ -36,7 +36,7 @@ import java.util.Map;
public class CanalJsonFormatFactory
implements DeserializationFormatFactory, SerializationFormatFactory {
- public static final String IDENTIFIER = "canal-json";
+ public static final String IDENTIFIER = "canal_json";
@Override
public String factoryIdentifier() {
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java
index df8951da6..3af85af1d 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonFormatOptions.java
@@ -24,7 +24,7 @@ import org.apache.seatunnel.format.json.JsonFormatOptions;
import java.util.Map;
-/** Option utils for canal-json format. */
+/** Option utils for canal_json format. */
public class CanalJsonFormatOptions {
public static final Option<Boolean> IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;