You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/21 11:35:08 UTC

[incubator-inlong] branch master updated: [INLONG-3860][Sort] Add json(avro/debezium-json/canal-json) format for ExtractNode and LoadNode (#3865)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d89fd1f31 [INLONG-3860][Sort] Add json(avro/debezium-json/canal-json) format for ExtractNode and LoadNode (#3865)
d89fd1f31 is described below

commit d89fd1f31062d3113a14e1e940494b5a9d45b170
Author: pacino <ge...@gmail.com>
AuthorDate: Thu Apr 21 19:35:01 2022 +0800

    [INLONG-3860][Sort] Add json(avro/debezium-json/canal-json) format for ExtractNode and LoadNode (#3865)
---
 .../sort/protocol/node/format/AvroFormat.java      |  76 +++++++++++++
 .../sort/protocol/node/format/CanalJsonFormat.java | 102 +++++++++++++++++
 .../protocol/node/format/DebeziumJsonFormat.java   | 109 ++++++++++++++++++
 .../inlong/sort/protocol/node/format/Format.java   |  51 +++++++++
 .../sort/protocol/node/format/JsonFormat.java      | 122 +++++++++++++++++++++
 .../sort/protocol/node/load/KafkaLoadNode.java     |  39 ++++++-
 .../apache/inlong/sort/protocol/GroupInfoTest.java |  23 ++--
 .../inlong/sort/protocol/StreamInfoTest.java       |  22 ++--
 .../sort/protocol/node/KafkaLoadNodeTest.java      |  10 +-
 .../sort/protocol/node/format/AvroFormatTest.java  |  32 ++++++
 .../protocol/node/format/CanalJsonFormatTest.java  |  32 ++++++
 .../node/format/DebeziumJsonFormatTest.java        |  32 ++++++
 .../sort/protocol/node/format/FormatBaseTest.java  |  55 ++++++++++
 .../sort/protocol/node/format/JsonFormatTest.java  |  32 ++++++
 14 files changed, 713 insertions(+), 24 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/AvroFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/AvroFormat.java
new file mode 100644
index 000000000..3a21df8cd
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/AvroFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Avro format.
+ *
+ * @see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/avro/">Avro
+ *         Format</a>
+ */
+@JsonTypeName("avroFormat")
+@Data
+public class AvroFormat implements Format {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty(value = "codec")
+    private String codec;
+
+    @JsonCreator
+    public AvroFormat(@JsonProperty(value = "codec") String codec) {
+        this.codec = codec;
+    }
+
+    @JsonCreator
+    public AvroFormat() {
+    }
+
+    /**
+     * Return avro
+     *
+     * @return format
+     */
+    @JsonIgnore
+    @Override
+    public String getFormat() {
+        return "avro";
+    }
+
+    /**
+     * generate options for connector
+     *
+     * @return options
+     */
+    @Override
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(4);
+        options.put("key.format", getFormat());
+        options.put("value.format", getFormat());
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java
new file mode 100644
index 000000000..717ca5b05
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Canal format.
+ *
+ * @see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/">
+ *         Canal Format</a>
+ */
+@JsonTypeName("canalJsonFormat")
+@Data
+public class CanalJsonFormat implements Format {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty(value = "ignoreParseErrors", defaultValue = "true")
+    private Boolean ignoreParseErrors;
+    @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL")
+    private String timestampFormatStandard;
+    @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP")
+    private String mapNullKeyMode;
+    @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null")
+    private String mapNullKeyLiteral;
+    @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+    private Boolean encodeDecimalAsPlainNumber;
+
+    @JsonCreator
+    public CanalJsonFormat(@JsonProperty(value = "ignoreParseErrors", defaultValue = "true") Boolean ignoreParseErrors,
+            @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL") String timestampFormatStandard,
+            @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP") String mapNullKeyMode,
+            @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null") String mapNullKeyLiteral,
+            @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+                    Boolean encodeDecimalAsPlainNumber) {
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormatStandard = timestampFormatStandard;
+        this.mapNullKeyMode = mapNullKeyMode;
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+    }
+
+    @JsonCreator
+    public CanalJsonFormat() {
+        this(true, "SQL", "DROP", "null", true);
+    }
+
+    /**
+     * Return canal-json
+     *
+     * @return format
+     */
+    @JsonIgnore
+    @Override
+    public String getFormat() {
+        return "canal-json";
+    }
+
+    /**
+     * Generate options for connector
+     *
+     * @return options
+     */
+    @Override
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(16);
+        options.put("format", getFormat());
+        if (this.ignoreParseErrors != null) {
+            options.put("canal-json.ignore-parse-errors", this.ignoreParseErrors.toString());
+        }
+        options.put("canal-json.timestamp-format.standard", this.timestampFormatStandard);
+        options.put("canal-json.map-null-key.mode", this.mapNullKeyMode);
+        options.put("canal-json.map-null-key.literal", this.mapNullKeyLiteral);
+        if (this.encodeDecimalAsPlainNumber != null) {
+            options.put("canal-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
+        }
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormat.java
new file mode 100644
index 000000000..445ff93c5
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormat.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Debezium format
+ *
+ * @see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/debezium">
+ *         Debezium Format</a>
+ */
+@JsonTypeName("debeziumJsonFormat")
+@Data
+public class DebeziumJsonFormat implements Format {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty(value = "schemaInclude", defaultValue = "false")
+    private Boolean schemaInclude;
+    @JsonProperty(value = "ignoreParseErrors", defaultValue = "true")
+    private Boolean ignoreParseErrors;
+    @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL")
+    private String timestampFormatStandard;
+    @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP")
+    private String mapNullKeyMode;
+    @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null")
+    private String mapNullKeyLiteral;
+    @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+    private Boolean encodeDecimalAsPlainNumber;
+
+    @JsonCreator
+    public DebeziumJsonFormat(@JsonProperty(value = "schemaInclude", defaultValue = "false") Boolean schemaInclude,
+            @JsonProperty(value = "ignoreParseErrors", defaultValue = "true") Boolean ignoreParseErrors,
+            @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL") String timestampFormatStandard,
+            @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP") String mapNullKeyMode,
+            @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null") String mapNullKeyLiteral,
+            @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+                    Boolean encodeDecimalAsPlainNumber) {
+        this.schemaInclude = schemaInclude;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormatStandard = timestampFormatStandard;
+        this.mapNullKeyMode = mapNullKeyMode;
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+    }
+
+    @JsonCreator
+    public DebeziumJsonFormat() {
+        this(false, true, "SQL", "DROP", "null", true);
+    }
+
+    /**
+     * Return debezium-json
+     *
+     * @return format
+     */
+    @JsonIgnore
+    @Override
+    public String getFormat() {
+        return "debezium-json";
+    }
+
+    /**
+     * Generate options for connector
+     *
+     * @return options
+     */
+    @Override
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(16);
+        options.put("format", getFormat());
+        if (this.schemaInclude != null) {
+            options.put("debezium-json.schema-include", this.schemaInclude.toString());
+        }
+        if (this.ignoreParseErrors != null) {
+            options.put("debezium-json.ignore-parse-errors", this.ignoreParseErrors.toString());
+        }
+        options.put("debezium-json.timestamp-format.standard", this.timestampFormatStandard);
+        options.put("debezium-json.map-null-key.mode", this.mapNullKeyMode);
+        options.put("debezium-json.map-null-key.literal", this.mapNullKeyLiteral);
+        if (this.encodeDecimalAsPlainNumber != null) {
+            options.put("debezium-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
+        }
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
new file mode 100644
index 000000000..5d3c2efd1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = JsonFormat.class, name = "jsonFormat"),
+        @JsonSubTypes.Type(value = AvroFormat.class, name = "avroFormat"),
+        @JsonSubTypes.Type(value = DebeziumJsonFormat.class, name = "debeziumJsonFormat"),
+        @JsonSubTypes.Type(value = CanalJsonFormat.class, name = "canalJsonFormat")
+})
+public interface Format extends Serializable {
+
+    /**
+     * return format for example json/avro/debezium-json/canal-json
+     *
+     * @return format
+     */
+    String getFormat();
+
+    /**
+     * generate options for connector
+     *
+     * @return options
+     */
+    Map<String, String> generateOptions();
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/JsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/JsonFormat.java
new file mode 100644
index 000000000..3a183cce9
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/JsonFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import lombok.ToString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Json format
+ *
+ * @see <a herf="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/json/">
+ *         Json Format</a>
+ */
+
+@JsonTypeName("jsonFormat")
+@Data
+@ToString
+public class JsonFormat implements Format {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty(value = "failOnMissingField", defaultValue = "false")
+    private Boolean failOnMissingField;
+    @JsonProperty(value = "ignoreParseErrors", defaultValue = "true")
+    private Boolean ignoreParseErrors;
+    @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL")
+    private String timestampFormatStandard;
+    @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP")
+    private String mapNullKeyMode;
+    @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null")
+    private String mapNullKeyLiteral;
+    @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+    private Boolean encodeDecimalAsPlainNumber;
+
+    @JsonCreator
+    public JsonFormat(@JsonProperty(value = "failOnMissingField", defaultValue = "false") Boolean failOnMissingField,
+            @JsonProperty(value = "ignoreParseErrors", defaultValue = "true") Boolean ignoreParseErrors,
+            @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL") String timestampFormatStandard,
+            @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP") String mapNullKeyMode,
+            @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null") String mapNullKeyLiteral,
+            @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+                    Boolean encodeDecimalAsPlainNumber) {
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormatStandard = timestampFormatStandard;
+        this.mapNullKeyMode = mapNullKeyMode;
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+    }
+
+    @JsonCreator
+    public JsonFormat() {
+        this(false, true, "SQL", "DROP", "null", true);
+    }
+
+    /**
+     * Return json
+     *
+     * @return format
+     */
+    @JsonIgnore
+    @Override
+    public String getFormat() {
+        return "json";
+    }
+
+    /**
+     * Generate options for connector
+     *
+     * @return options
+     */
+    @Override
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(32);
+        options.put("key.format", getFormat());
+        options.put("value.format", getFormat());
+        if (this.failOnMissingField != null) {
+            String failOnMissingField = this.failOnMissingField.toString();
+            options.put("value.json.fail-on-missing-field", failOnMissingField);
+            options.put("key.json.fail-on-missing-field", failOnMissingField);
+        }
+        if (this.ignoreParseErrors != null) {
+            String ignoreParseErrors = this.ignoreParseErrors.toString();
+            options.put("value.json.ignore-parse-errors", ignoreParseErrors);
+            options.put("key.json.ignore-parse-errors", ignoreParseErrors);
+        }
+        options.put("value.json.timestamp-format.standard", this.timestampFormatStandard);
+        options.put("value.json.map-null-key.mode", this.mapNullKeyMode);
+        options.put("value.json.map-null-key.literal", this.mapNullKeyLiteral);
+        if (this.encodeDecimalAsPlainNumber != null) {
+            String encodeDecimalAsPlainNumber = this.encodeDecimalAsPlainNumber.toString();
+            options.put("value.json.encode.decimal-as-plain-number", encodeDecimalAsPlainNumber);
+            options.put("key.json.encode.decimal-as-plain-number", encodeDecimalAsPlainNumber);
+        }
+        options.put("key.json.timestamp-format.standard", this.timestampFormatStandard);
+        options.put("key.json.map-null-key.mode", this.mapNullKeyMode);
+        options.put("key.json.map-null-key.literal", this.mapNullKeyLiteral);
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 500f82c31..3483ee68a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -21,10 +21,16 @@ import com.google.common.base.Preconditions;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
@@ -51,7 +57,10 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
     private String bootstrapServers;
     @Nonnull
     @JsonProperty("format")
-    private String format;
+    private Format format;
+
+    @JsonProperty("primaryKey")
+    private String primaryKey;
 
     public KafkaLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -60,13 +69,20 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
             @JsonProperty("filters") List<FilterFunction> filters,
             @Nonnull @JsonProperty("topic") String topic,
             @Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
-            @Nonnull @JsonProperty("format") String format,
+            @Nonnull @JsonProperty("format") Format format,
             @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
-            @JsonProperty("properties") Map<String, String> properties) {
+            @JsonProperty("properties") Map<String, String> properties,
+            @JsonProperty("primaryKey") String primaryKey) {
         super(id, name, fields, fieldRelationShips, filters, sinkParallelism, properties);
         this.topic = Preconditions.checkNotNull(topic, "topic is null");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrapServers is null");
         this.format = Preconditions.checkNotNull(format, "format is null");
+        if (format instanceof JsonFormat && StringUtils.isEmpty(primaryKey)) {
+            throw new IllegalArgumentException("primaryKey is empty when format is json");
+        } else if (format instanceof AvroFormat && StringUtils.isEmpty(primaryKey)) {
+            throw new IllegalArgumentException("primaryKey is empty when format is avro");
+        }
+        this.primaryKey = primaryKey;
     }
 
     @Override
@@ -74,16 +90,29 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
         return "node_" + super.getId() + "_" + topic;
     }
 
+    /**
+     * Generate options for kafka connector
+     *
+     * @return options
+     */
     @Override
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
-        options.put("connector", "kafka");
         options.put("topic", topic);
         options.put("properties.bootstrap.servers", bootstrapServers);
-        options.put("format", format);
         if (getSinkParallelism() != null) {
             options.put("sink.parallelism", getSinkParallelism().toString());
         }
+        if (format instanceof JsonFormat || format instanceof AvroFormat) {
+            options.put("connector", "upsert-kafka");
+        } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
+            options.put("connector", "kafka");
+        } else {
+            throw new IllegalArgumentException("kafka load Node format is IllegalArgument");
+        }
+        if (format.generateOptions() != null && !format.generateOptions().isEmpty()) {
+            options.putAll(format.generateOptions());
+        }
         return options;
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
index 1a6805209..a14f0f515 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -39,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -78,8 +80,8 @@ public class GroupInfoTest {
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
-                "topic", "localhost:9092", "json",
-                1, null);
+                "topic", "localhost:9092", new JsonFormat(),
+                1, null, "id");
     }
 
     private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -135,9 +137,12 @@ public class GroupInfoTest {
                         + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
                         + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
                         + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
-                        + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
-                        + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
-                        + "\"outputs\":[\"2\"]}]}]}";
+                        + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+                        + "\"format\":{\"type\":\"jsonFormat\",\"failOnMissingField\":false,\"ignoreParseErrors\":true,"
+                        + "\"timestampFormatStandard\":\"SQL\",\"mapNullKeyMode\":\"DROP\","
+                        + "\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+                        + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\",\""
+                        + "inputs\":[\"1\"],\"outputs\":[\"2\"]}]}]}";
         assertEquals(expected, objectMapper.writeValueAsString(groupInfo));
     }
 
@@ -185,9 +190,11 @@ public class GroupInfoTest {
                 + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
                 + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
                 + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
-                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
-                + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
-                + "\"outputs\":[\"2\"]}]}]}";
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+                + "\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+                + "\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+                + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\","
+                + "\"inputs\":[\"1\"],\"outputs\":[\"2\"]}]}]}";
         GroupInfo expected = objectMapper.readValue(groupInfoStr, GroupInfo.class);
         assertEquals(expected, groupInfo);
     }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
index 4c65eda6e..5c0f630c6 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -39,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -78,8 +80,8 @@ public class StreamInfoTest {
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
-                "topic", "localhost:9092", "json",
-                1, null);
+                "topic", "localhost:9092", new JsonFormat(),
+                1, null, "id");
     }
 
     private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -130,9 +132,11 @@ public class StreamInfoTest {
                 + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
                 + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
                 + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
-                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
-                + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
-                + "\"outputs\":[\"2\"]}]}";
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+                + "\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+                + "\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+                + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\","
+                + "\"inputs\":[\"1\"],\"outputs\":[\"2\"]}]}";
         assertEquals(expected, objectMapper.writeValueAsString(streamInfo));
     }
 
@@ -178,9 +182,11 @@ public class StreamInfoTest {
                 + "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
                 + "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
                 + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
-                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
-                + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
-                + "\"outputs\":[\"2\"]}]}";
+                + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+                + "\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+                + "\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+                + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\","
+                + "\"inputs\":[\"1\"],\"outputs\":[\"2\"]}]}";
         StreamInfo expected = objectMapper.readValue(streamInfoStr, StreamInfo.class);
         assertEquals(expected, streamInfo);
     }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
index b11956474..4064d177b 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.node;
 
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 
@@ -33,8 +34,8 @@ public class KafkaLoadNodeTest extends NodeBaseTest {
                 Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
                 Arrays.asList(new FieldRelationShip(new FieldInfo("field", new StringFormatInfo()),
                         new FieldInfo("field", new StringFormatInfo()))), null,
-                "topic", "localhost:9092", "json",
-                1, new TreeMap<>());
+                "topic", "localhost:9092", new CanalJsonFormat(),
+                1, new TreeMap<>(), null);
     }
 
     @Override
@@ -44,6 +45,9 @@ public class KafkaLoadNodeTest extends NodeBaseTest {
                 + "\"inputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}},"
                 + "\"outputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}}}],"
                 + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
-                + "\"format\":\"json\",\"sinkParallelism\":1,\"properties\":{}}";
+                + "\"format\":{\"type\":\"canalJsonFormat\",\"ignoreParseErrors\":true,"
+                + "\"timestampFormatStandard\":\"SQL\",\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\","
+                + "\"encodeDecimalAsPlainNumber\":true},\"sinkParallelism\":1,"
+                + "\"properties\":{}}";
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/AvroFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/AvroFormatTest.java
new file mode 100644
index 000000000..6d5de8df1
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/AvroFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class AvroFormatTest extends FormatBaseTest {
+
+    /**
+     * Test Serialize and Deserialize of avro
+     *
+     * @return format
+     * @see Format
+     */
+    @Override
+    public Format getFormat() {
+        return new AvroFormat();
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormatTest.java
new file mode 100644
index 000000000..bdaf845cf
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class CanalJsonFormatTest extends FormatBaseTest {
+
+    /**
+     * Test Serialize and Deserialize of canal-json
+     *
+     * @return format
+     * @see Format
+     */
+    @Override
+    public Format getFormat() {
+        return new CanalJsonFormat();
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormatTest.java
new file mode 100644
index 000000000..6793dde30
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class DebeziumJsonFormatTest extends FormatBaseTest {
+
+    /**
+     * Test Serialize and Deserialize of debezium-json
+     *
+     * @return format
+     * @see Format
+     */
+    @Override
+    public Format getFormat() {
+        return new DebeziumJsonFormat();
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/FormatBaseTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/FormatBaseTest.java
new file mode 100644
index 000000000..e476ce96e
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/FormatBaseTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class FormatBaseTest {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private Format format;
+
+    /**
+     * init format object
+     */
+    @Before
+    public void before() {
+        this.format = Preconditions.checkNotNull(getFormat());
+    }
+
+    public abstract Format getFormat();
+
+    /**
+     * Test Serialize and Deserialize
+     * @throws JsonProcessingException throw JsonProcessingException when parse exception
+     */
+    @Test
+    public void testSerializeAndDeserialize() throws JsonProcessingException {
+        String jsonStr = objectMapper.writeValueAsString(format);
+        Format deserializeFormat = objectMapper.readValue(jsonStr, Format.class);
+        assertEquals(format, deserializeFormat);
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/JsonFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/JsonFormatTest.java
new file mode 100644
index 000000000..4b777a2a8
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/JsonFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class JsonFormatTest extends FormatBaseTest {
+
+    /**
+     * Test Serialize and Deserialize of json
+     *
+     * @return format
+     * @see Format
+     */
+    @Override
+    public Format getFormat() {
+        return new JsonFormat();
+    }
+}