You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/29 11:01:45 UTC

[incubator-inlong] branch master updated: [INLONG-4013][Sort] Support writing metadata in canal format (#4023)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bae36874 [INLONG-4013][Sort] Support writing metadata in canal format (#4023)
1bae36874 is described below

commit 1bae36874f35b428d516334ced82e7823bab0d89
Author: thexia <37...@users.noreply.github.com>
AuthorDate: Fri Apr 29 19:01:39 2022 +0800

    [INLONG-4013][Sort] Support writing metadata in canal format (#4023)
    
    * Support writing metadata in canal format
    
    * fix code style
    
    * fix license problem
    
    * fix ut error
    
    * fix the scala version to a variable
---
 inlong-sort/pom.xml                                |   6 +
 inlong-sort/sort-connectors/pom.xml                |  10 +
 .../kafka/DynamicKafkaSerializationSchema.java     | 205 +++++++++
 .../inlong/sort/flink/kafka/KafkaDynamicSink.java  | 471 +++++++++++++++++++++
 .../sort/flink/kafka/KafkaDynamicTableFactory.java | 372 ++++++++++++++++
 .../inlong/sort}/flink/kafka/KafkaSinkBuilder.java |   2 +-
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../canal/CanalJsonEnhancedDecodingFormat.java     | 329 ++++++++++++++
 .../CanalJsonEnhancedDeserializationSchema.java    | 404 ++++++++++++++++++
 .../canal/CanalJsonEnhancedEncodingFormat.java     | 280 ++++++++++++
 .../json/canal/CanalJsonEnhancedFormatFactory.java | 113 +++++
 .../CanalJsonEnhancedSerializationSchema.java      | 191 +++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../canal/CanalJsonEnhancedFormatFactoryTest.java  | 138 ++++++
 .../canal/CanalJsonEnhancedSerDeSchemaTest.java    | 214 ++++++++++
 .../src/test/resources/canal-json-inlong-data.txt  |   3 +
 .../src/test/resources/log4j-test.properties       |  29 ++
 inlong-sort/sort-formats/pom.xml                   |  14 +
 inlong-sort/sort-single-tenant/pom.xml             |   5 -
 .../inlong/sort/singletenant/flink/Entrance.java   |   2 +-
 .../flink/parser/impl/FlinkSqlParser.java          |  34 +-
 .../flink/kafka/KafkaSinkTestBase.java             |   4 +-
 .../flink/kafka/RowToAvroKafkaSinkTest.java        |   2 +-
 .../flink/kafka/RowToCanalKafkaSinkTest.java       |   2 +-
 .../kafka/RowToDebeziumJsonKafkaSinkTest.java      |   2 +-
 .../flink/kafka/RowToJsonKafkaSinkTest.java        |   2 +-
 .../flink/kafka/RowToStringKafkaSinkTest.java      |   2 +-
 .../flink/parser/FlinkSqlParserTest.java           |   1 -
 pom.xml                                            |   6 +
 29 files changed, 2842 insertions(+), 33 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 5844c4bc4..3d1f326e6 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -75,6 +75,12 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index c6d213ba8..f4713775c 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -59,6 +59,11 @@
             <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${flink.scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
@@ -98,6 +103,11 @@
             <groupId>org.antlr</groupId>
             <artifactId>ST4</artifactId>
         </dependency>
+        <!--for kafka-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka_${flink.scala.binary.version}</artifactId>
+        </dependency>
         <!--for pulsar-->
         <dependency>
             <groupId>org.apache.pulsar</groupId>
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/DynamicKafkaSerializationSchema.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/DynamicKafkaSerializationSchema.java
new file mode 100644
index 000000000..be3f26487
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/DynamicKafkaSerializationSchema.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.flink.kafka.KafkaDynamicSink.WritableMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSink}.
+ */
+class DynamicKafkaSerializationSchema
+        implements KafkaSerializationSchema<RowData>, KafkaContextAware<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+    private final String topic;
+
+    private final @Nullable SerializationSchema<RowData> keySerialization;
+
+    private final SerializationSchema<RowData> valueSerialization;
+
+    private final RowData.FieldGetter[] keyFieldGetters;
+
+    private final RowData.FieldGetter[] valueFieldGetters;
+
+    private final boolean hasMetadata;
+
+    private final boolean upsertMode;
+
+    /**
+     * Contains the position for each value of {@link WritableMetadata} in the
+     * consumed row or -1 if this metadata key is not used.
+     */
+    private final int[] metadataPositions;
+
+    private int[] partitions;
+
+    private int parallelInstanceId;
+
+    private int numParallelInstances;
+
+    DynamicKafkaSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization schema.");
+        }
+        this.topic = topic;
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+    }
+
+    @Override
+    public void open(SerializationSchema.InitializationContext context) throws Exception {
+        if (keySerialization != null) {
+            keySerialization.open(context);
+        }
+        valueSerialization.open(context);
+        if (partitioner != null) {
+            partitioner.open(parallelInstanceId, numParallelInstances);
+        }
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
+        // shortcut in case no input projection is required
+        if (keySerialization == null && !hasMetadata) {
+            final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+            return new ProducerRecord<>(
+                    topic,
+                    extractPartition(consumedRow, null, valueSerialized),
+                    null,
+                    valueSerialized);
+        }
+
+        final byte[] keySerialized;
+        if (keySerialization == null) {
+            keySerialized = null;
+        } else {
+            final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters);
+            keySerialized = keySerialization.serialize(keyRow);
+        }
+
+        final byte[] valueSerialized;
+        final RowKind kind = consumedRow.getRowKind();
+        final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters);
+        if (upsertMode) {
+            if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
+                // transform the message as the tombstone message
+                valueSerialized = null;
+            } else {
+                // make the message to be INSERT to be compliant with the INSERT-ONLY format
+                valueRow.setRowKind(RowKind.INSERT);
+                valueSerialized = valueSerialization.serialize(valueRow);
+            }
+        } else {
+            valueSerialized = valueSerialization.serialize(valueRow);
+        }
+
+        return new ProducerRecord<>(
+                topic,
+                extractPartition(consumedRow, keySerialized, valueSerialized),
+                readMetadata(consumedRow, WritableMetadata.TIMESTAMP),
+                keySerialized,
+                valueSerialized,
+                readMetadata(consumedRow, WritableMetadata.HEADERS));
+    }
+
+    @Override
+    public void setParallelInstanceId(int parallelInstanceId) {
+        this.parallelInstanceId = parallelInstanceId;
+    }
+
+    @Override
+    public void setNumParallelInstances(int numParallelInstances) {
+        this.numParallelInstances = numParallelInstances;
+    }
+
+    @Override
+    public void setPartitions(int[] partitions) {
+        this.partitions = partitions;
+    }
+
+    @Override
+    public String getTargetTopic(RowData element) {
+        return topic;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T readMetadata(RowData consumedRow, WritableMetadata metadata) {
+        final int pos = metadataPositions[metadata.ordinal()];
+        if (pos < 0) {
+            return null;
+        }
+        return (T) metadata.converter.read(consumedRow, pos);
+    }
+
+    private Integer extractPartition(
+            RowData consumedRow, @Nullable byte[] keySerialized, byte[] valueSerialized) {
+        if (partitioner != null) {
+            return partitioner.partition(
+                    consumedRow, keySerialized, valueSerialized, topic, partitions);
+        }
+        return null;
+    }
+
+    static RowData createProjectedRow(
+            RowData consumedRow, RowKind kind, RowData.FieldGetter[] fieldGetters) {
+        final int arity = fieldGetters.length;
+        final GenericRowData genericRowData = new GenericRowData(kind, arity);
+        for (int fieldPos = 0; fieldPos < arity; fieldPos++) {
+            genericRowData.setField(fieldPos, fieldGetters[fieldPos].getFieldOrNull(consumedRow));
+        }
+        return genericRowData;
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    interface MetadataConverter extends Serializable {
+        Object read(RowData consumedRow, int pos);
+    }
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicSink.java
new file mode 100644
index 000000000..cf2980b30
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicSink.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.inlong.sort.flink.kafka.DynamicKafkaSerializationSchema.MetadataConverter;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A version-agnostic Kafka {@link DynamicTableSink}. */
+@Internal
+public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Metadata that is appended at the end of a physical sink row. */
+    protected List<String> metadataKeys;
+
+    // --------------------------------------------------------------------------------------------
+    // Format attributes
+    // --------------------------------------------------------------------------------------------
+    private static final String VALUE_METADATA_PREFIX = "value.";
+
+    /** Data type of consumed data type. */
+    protected DataType consumedDataType;
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+
+    /** Optional format for encoding keys to Kafka. */
+    protected final @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;
+
+    /** Format for encoding values to Kafka. */
+    protected final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat;
+
+    /** Indices that determine the key fields and the source position in the consumed row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the source position in the consumed row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the physical data type. */
+    protected final @Nullable String keyPrefix;
+
+    // --------------------------------------------------------------------------------------------
+    // Kafka-specific attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** The Kafka topic to write to. */
+    protected final String topic;
+
+    /** Properties for the Kafka producer. */
+    protected final Properties properties;
+
+    /** Partitioner to select Kafka partition for each item. */
+    protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+    /** Sink commit semantic. */
+    protected final KafkaSinkSemantic semantic;
+
+    /**
+     * Flag to determine sink mode. In upsert mode sink transforms the delete/update-before message
+     * to tombstone message.
+     */
+    protected final boolean upsertMode;
+
+    /** Sink buffer flush config which only supported in upsert mode now. */
+    protected final SinkBufferFlushMode flushMode;
+
+    /** Parallelism of the physical Kafka producer. * */
+    protected final @Nullable Integer parallelism;
+
+    public KafkaDynamicSink(
+            DataType consumedDataType,
+            DataType physicalDataType,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            String topic,
+            Properties properties,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            KafkaSinkSemantic semantic,
+            boolean upsertMode,
+            SinkBufferFlushMode flushMode,
+            @Nullable Integer parallelism) {
+        // Format attributes
+        this.consumedDataType =
+                checkNotNull(consumedDataType, "Consumed data type must not be null.");
+        this.physicalDataType =
+                checkNotNull(physicalDataType, "Physical data type must not be null.");
+        this.keyEncodingFormat = keyEncodingFormat;
+        this.valueEncodingFormat =
+                checkNotNull(valueEncodingFormat, "Value encoding format must not be null.");
+        this.keyProjection = checkNotNull(keyProjection, "Key projection must not be null.");
+        this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null.");
+        this.keyPrefix = keyPrefix;
+        // Mutable attributes
+        this.metadataKeys = Collections.emptyList();
+        // Kafka-specific attributes
+        this.topic = checkNotNull(topic, "Topic must not be null.");
+        this.properties = checkNotNull(properties, "Properties must not be null.");
+        this.partitioner = partitioner;
+        this.semantic = checkNotNull(semantic, "Semantic must not be null.");
+        this.upsertMode = upsertMode;
+        this.flushMode = checkNotNull(flushMode);
+        if (flushMode.isEnabled() && !upsertMode) {
+            throw new IllegalArgumentException(
+                    "Sink buffer flush is only supported in upsert-kafka.");
+        }
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return valueEncodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final SerializationSchema<RowData> keySerialization =
+                createSerialization(context, keyEncodingFormat, keyProjection, keyPrefix);
+
+        final SerializationSchema<RowData> valueSerialization =
+                createSerialization(context, valueEncodingFormat, valueProjection, null);
+
+        final FlinkKafkaProducer<RowData> kafkaProducer =
+                createKafkaProducer(keySerialization, valueSerialization);
+
+        if (flushMode.isEnabled() && upsertMode) {
+            BufferedUpsertSinkFunction buffedSinkFunction =
+                    new BufferedUpsertSinkFunction(
+                            kafkaProducer,
+                            physicalDataType,
+                            keyProjection,
+                            context.createTypeInformation(consumedDataType),
+                            flushMode);
+            return SinkFunctionProvider.of(buffedSinkFunction, parallelism);
+        } else {
+            return SinkFunctionProvider.of(kafkaProducer, parallelism);
+        }
+    }
+
+    @Override
+    public Map<String, DataType> listWritableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+
+        // add value format metadata with prefix
+        valueEncodingFormat
+                .listWritableMetadata()
+                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+        // add connector metadata
+        Stream.of(WritableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+                        .collect(Collectors.toList());
+        final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata = valueEncodingFormat.listWritableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream()
+                            .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+                            .collect(Collectors.toList());
+            valueEncodingFormat.applyWritableMetadata(requestedFormatMetadataKeys);
+        }
+
+        this.metadataKeys = connectorMetadataKeys;
+        this.consumedDataType = consumedDataType;
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        final KafkaDynamicSink copy =
+                new KafkaDynamicSink(
+                        consumedDataType,
+                        physicalDataType,
+                        keyEncodingFormat,
+                        valueEncodingFormat,
+                        keyProjection,
+                        valueProjection,
+                        keyPrefix,
+                        topic,
+                        properties,
+                        partitioner,
+                        semantic,
+                        upsertMode,
+                        flushMode,
+                        parallelism);
+        copy.metadataKeys = metadataKeys;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Kafka table sink";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final KafkaDynamicSink that = (KafkaDynamicSink) o;
+        return Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(consumedDataType, that.consumedDataType)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(keyEncodingFormat, that.keyEncodingFormat)
+                && Objects.equals(valueEncodingFormat, that.valueEncodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(keyPrefix, that.keyPrefix)
+                && Objects.equals(topic, that.topic)
+                && Objects.equals(properties, that.properties)
+                && Objects.equals(partitioner, that.partitioner)
+                && Objects.equals(semantic, that.semantic)
+                && Objects.equals(upsertMode, that.upsertMode)
+                && Objects.equals(flushMode, that.flushMode)
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                metadataKeys,
+                consumedDataType,
+                physicalDataType,
+                keyEncodingFormat,
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topic,
+                properties,
+                partitioner,
+                semantic,
+                upsertMode,
+                flushMode,
+                parallelism);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    protected FlinkKafkaProducer<RowData> createKafkaProducer(
+            SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization) {
+        final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                physicalChildren.get(targetField), targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+
+        final RowData.FieldGetter[] valueFieldGetters =
+                Arrays.stream(valueProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                physicalChildren.get(targetField), targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+
+        // determine the positions of metadata in the consumed row
+        final int[] metadataPositions =
+                Stream.of(WritableMetadata.values())
+                        .mapToInt(
+                                m -> {
+                                    final int pos = metadataKeys.indexOf(m.key);
+                                    if (pos < 0) {
+                                        return -1;
+                                    }
+                                    return physicalChildren.size() + pos;
+                                })
+                        .toArray();
+
+        // check if metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        final DynamicKafkaSerializationSchema kafkaSerializer =
+                new DynamicKafkaSerializationSchema(
+                        topic,
+                        partitioner,
+                        keySerialization,
+                        valueSerialization,
+                        keyFieldGetters,
+                        valueFieldGetters,
+                        hasMetadata,
+                        metadataPositions,
+                        upsertMode);
+
+        return new FlinkKafkaProducer<>(
+                topic,
+                kafkaSerializer,
+                properties,
+                FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
+                FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+    }
+
+    private @Nullable SerializationSchema<RowData> createSerialization(
+            DynamicTableSink.Context context,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType =
+                DataTypeUtils.projectRow(this.physicalDataType, projection);
+        if (prefix != null) {
+            physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeEncoder(context, physicalFormatDataType);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    enum WritableMetadata {
+        HEADERS(
+                "headers",
+                // key and value of the map are nullable to make handling easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
+                        .nullable(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        final MapData map = row.getMap(pos);
+                        final ArrayData keyArray = map.keyArray();
+                        final ArrayData valueArray = map.valueArray();
+                        final List<Header> headers = new ArrayList<>();
+                        for (int i = 0; i < keyArray.size(); i++) {
+                            if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
+                                final String key = keyArray.getString(i).toString();
+                                final byte[] value = valueArray.getBinary(i);
+                                headers.add(new KafkaHeader(key, value));
+                            }
+                        }
+                        return headers;
+                    }
+                }),
+
+        TIMESTAMP(
+                "timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getTimestamp(pos, 3).getMillisecond();
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        WritableMetadata(String key, DataType dataType, MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static class KafkaHeader implements Header {
+
+        private final String key;
+
+        private final byte[] value;
+
+        KafkaHeader(String key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public String key() {
+            return key;
+        }
+
+        @Override
+        public byte[] value() {
+            return value;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicTableFactory.java
new file mode 100644
index 000000000..0f74f04e0
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaDynamicTableFactory.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.kafka;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FIELDS_INCLUDE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.autoCompleteSchemaRegistrySubject;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+
+/**
+ *  Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Factory for creating configured instances of {@link KafkaDynamicSource} and {@link
+ * KafkaDynamicSink}.We modify KafkaDynamicTableSink to support format metadata writeable.
+ */
+@Internal
+public class KafkaDynamicTableFactory
+        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "kafka-inlong";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROPS_BOOTSTRAP_SERVERS);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FactoryUtil.FORMAT);
+        options.add(KEY_FORMAT);
+        options.add(KEY_FIELDS);
+        options.add(KEY_FIELDS_PREFIX);
+        options.add(VALUE_FORMAT);
+        options.add(VALUE_FIELDS_INCLUDE);
+        options.add(TOPIC);
+        options.add(TOPIC_PATTERN);
+        options.add(PROPS_GROUP_ID);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        options.add(SINK_PARTITIONER);
+        options.add(SINK_SEMANTIC);
+        options.add(SINK_PARALLELISM);
+        return options;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
+                getKeyDecodingFormat(helper);
+
+        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+                getValueDecodingFormat(helper);
+
+        helper.validateExcept(PROPERTIES_PREFIX);
+
+        validateTableSourceOptions(tableOptions);
+
+        validatePKConstraints(
+                context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat);
+
+        final StartupOptions startupOptions = getStartupOptions(tableOptions);
+
+        final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
+
+        // add topic-partition discovery
+        properties.setProperty(
+                FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                String.valueOf(
+                        tableOptions
+                                .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+                                .map(Duration::toMillis)
+                                .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
+
+        final DataType physicalDataType =
+                context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
+
+        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        return createKafkaTableSource(
+                physicalDataType,
+                keyDecodingFormat.orElse(null),
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                KafkaOptions.getSourceTopics(tableOptions),
+                KafkaOptions.getSourceTopicPattern(tableOptions),
+                properties,
+                startupOptions.startupMode,
+                startupOptions.specificOffsets,
+                startupOptions.startupTimestampMillis);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(
+                        this, autoCompleteSchemaRegistrySubject(context));
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
+                getKeyEncodingFormat(helper);
+
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+                getValueEncodingFormat(helper);
+
+        helper.validateExcept(PROPERTIES_PREFIX);
+
+        validateTableSinkOptions(tableOptions);
+
+        validatePKConstraints(
+                context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat);
+
+        final DataType physicalDataType =
+                context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType);
+
+        final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+
+        return createKafkaTableSink(
+                physicalDataType,
+                keyEncodingFormat.orElse(null),
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                tableOptions.get(TOPIC).get(0),
+                getKafkaProperties(context.getCatalogTable().getOptions()),
+                getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null),
+                getSinkSemantic(tableOptions),
+                parallelism);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> getKeyDecodingFormat(
+            TableFactoryHelper helper) {
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
+                helper.discoverOptionalDecodingFormat(
+                        DeserializationFormatFactory.class, KEY_FORMAT);
+        keyDecodingFormat.ifPresent(
+                format -> {
+                    if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A key format should only deal with INSERT-only records. "
+                                                + "But %s has a changelog mode of %s.",
+                                        helper.getOptions().get(KEY_FORMAT),
+                                        format.getChangelogMode()));
+                    }
+                });
+        return keyDecodingFormat;
+    }
+
+    private static Optional<EncodingFormat<SerializationSchema<RowData>>> getKeyEncodingFormat(
+            TableFactoryHelper helper) {
+        final Optional<EncodingFormat<SerializationSchema<RowData>>> keyEncodingFormat =
+                helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT);
+        keyEncodingFormat.ifPresent(
+                format -> {
+                    if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A key format should only deal with INSERT-only records. "
+                                                + "But %s has a changelog mode of %s.",
+                                        helper.getOptions().get(KEY_FORMAT),
+                                        format.getChangelogMode()));
+                    }
+                });
+        return keyEncodingFormat;
+    }
+
+    private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
+            TableFactoryHelper helper) {
+        return helper.discoverOptionalDecodingFormat(
+                        DeserializationFormatFactory.class, FactoryUtil.FORMAT)
+                .orElseGet(
+                        () ->
+                                helper.discoverDecodingFormat(
+                                        DeserializationFormatFactory.class, VALUE_FORMAT));
+    }
+
+    private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
+            TableFactoryHelper helper) {
+        return helper.discoverOptionalEncodingFormat(
+                        SerializationFormatFactory.class, FactoryUtil.FORMAT)
+                .orElseGet(
+                        () ->
+                                helper.discoverEncodingFormat(
+                                        SerializationFormatFactory.class, VALUE_FORMAT));
+    }
+
+    private static void validatePKConstraints(
+            ObjectIdentifier tableName, CatalogTable catalogTable, Format format) {
+        if (catalogTable.getSchema().getPrimaryKey().isPresent()
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            Configuration options = Configuration.fromMap(catalogTable.getOptions());
+            String formatName =
+                    options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+            throw new ValidationException(
+                    String.format(
+                            "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
+                                    + " on the table, because it can't guarantee the semantic of primary key.",
+                            tableName.asSummaryString(), formatName));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    protected KafkaDynamicSource createKafkaTableSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
+            Properties properties,
+            StartupMode startupMode,
+            Map<KafkaTopicPartition, Long> specificStartupOffsets,
+            long startupTimestampMillis) {
+        return new KafkaDynamicSource(
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                properties,
+                startupMode,
+                specificStartupOffsets,
+                startupTimestampMillis,
+                false);
+    }
+
+    protected KafkaDynamicSink createKafkaTableSink(
+            DataType physicalDataType,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            String topic,
+            Properties properties,
+            FlinkKafkaPartitioner<RowData> partitioner,
+            KafkaSinkSemantic semantic,
+            Integer parallelism) {
+        return new KafkaDynamicSink(
+                physicalDataType,
+                physicalDataType,
+                keyEncodingFormat,
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topic,
+                properties,
+                partitioner,
+                semantic,
+                false,
+                SinkBufferFlushMode.DISABLED,
+                parallelism);
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaSinkBuilder.java
similarity index 97%
rename from inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
rename to inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaSinkBuilder.java
index f7b20ecc1..c7d08a7e1 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkBuilder.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/kafka/KafkaSinkBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
diff --git a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..9719b692b
--- /dev/null
+++ b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
new file mode 100644
index 000000000..7e77e1eb0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDeserializationSchema.MetadataConverter;
+
+/**
+ * {@link DecodingFormat} for Canal using JSON encoding.
+ * different from flink:1.13.5. This support more metadata.
+ */
+public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    // --------------------------------------------------------------------------------------------
+    // Canal-specific attributes
+    // --------------------------------------------------------------------------------------------
+
+    private final @Nullable String database;
+
+    private final @Nullable String table;
+
+    private final boolean ignoreParseErrors;
+
+    private final TimestampFormat timestampFormat;
+
+    public CanalJsonEnhancedDecodingFormat(
+            String database,
+            String table,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        this.database = database;
+        this.table = table;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType) {
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                .<IllegalStateException>orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+        return CanalJsonEnhancedDeserializationSchema.builder(
+                        physicalDataType, readableMetadata, producedTypeInfo)
+                .setDatabase(database)
+                .setTable(table)
+                .setIgnoreParseErrors(ignoreParseErrors)
+                .setTimestampFormat(timestampFormat)
+                .build();
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    /** List of metadata that can be read with this format. */
+    public enum ReadableMetadata {
+        DATABASE(
+                "database",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("database", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return row.getString(pos);
+                    }
+                }),
+
+        TABLE(
+                "table",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("table", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return row.getString(pos);
+                    }
+                }),
+
+        SQL_TYPE(
+                "sql-type",
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
+                DataTypes.FIELD(
+                        "sqlType",
+                        DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return row.getMap(pos);
+                    }
+                }),
+
+        PK_NAMES(
+                "pk-names",
+                DataTypes.ARRAY(DataTypes.STRING()).nullable(),
+                DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        return row.getArray(pos);
+                    }
+                }),
+
+        INGESTION_TIMESTAMP(
+                "ingestion-timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.FIELD("ts", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return TimestampData.fromEpochMillis(row.getLong(pos));
+                    }
+                }),
+
+        EVENT_TIMESTAMP(
+                "event-timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.FIELD("es", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return TimestampData.fromEpochMillis(row.getLong(pos));
+                    }
+                }),
+        // additional metadata
+        OP_TYPE(
+                "op-type",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("opType", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getString(pos);
+                    }
+                }),
+        IS_DDL(
+                "is-ddl",
+                DataTypes.BOOLEAN().nullable(),
+                DataTypes.FIELD("isDdl", DataTypes.BOOLEAN()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getBoolean(pos);
+                    }
+                }),
+
+        MYSQL_TYPE(
+                "mysql-type",
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
+                DataTypes.FIELD("mysqlType", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getMap(pos);
+                    }
+                }),
+        BATCH_ID(
+                "batch-id",
+                DataTypes.BIGINT().nullable(),
+                DataTypes.FIELD("batchId", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getLong(pos);
+                    }
+                }),
+        UPDATE_BEFORE(
+                "update-before",
+                DataTypes.ARRAY(
+                        DataTypes.MAP(
+                                DataTypes.STRING().nullable(),
+                                DataTypes.STRING().nullable())
+                                .nullable())
+                        .nullable(),
+                DataTypes.FIELD("updateBefore", DataTypes.ARRAY(
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getArray(pos);
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final DataTypes.Field requiredJsonField;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(
+                String key,
+                DataType dataType,
+                DataTypes.Field requiredJsonField,
+                MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.requiredJsonField = requiredJsonField;
+            this.converter = converter;
+        }
+
+        public String getKey() {
+            return key;
+        }
+    }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
new file mode 100644
index 000000000..90696bc14
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata;
+
+/**
+ * Deserialization schema from Canal JSON to Flink Table/SQL internal data structure {@link
+ * RowData}. The deserialization schema knows Canal's schema definition and can extract the database
+ * data and convert into {@link RowData} with {@link RowKind}.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
+ */
+public final class CanalJsonEnhancedDeserializationSchema implements DeserializationSchema<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    private static final String FIELD_OLD = "old";
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_UPDATE = "UPDATE";
+    private static final String OP_DELETE = "DELETE";
+    private static final String OP_CREATE = "CREATE";
+
+    /** The deserializer to deserialize Canal JSON data. */
+    private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+    /** Flag that indicates that an additional projection is required for metadata. */
+    private final boolean hasMetadata;
+
+    /** Metadata to be extracted for every record. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /** Only read changelogs from the specific database. */
+    private final @Nullable String database;
+
+    /** Only read changelogs from the specific table. */
+    private final @Nullable String table;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+    private final boolean ignoreParseErrors;
+
+    /** Names of fields. */
+    private final List<String> fieldNames;
+
+    /** Number of fields. */
+    private final int fieldCount;
+
+    /** Pattern of the specific database. */
+    private final Pattern databasePattern;
+
+    /** Pattern of the specific table. */
+    private final Pattern tablePattern;
+
+    private CanalJsonEnhancedDeserializationSchema(
+            DataType physicalDataType,
+            List<ReadableMetadata> requestedMetadata,
+            TypeInformation<RowData> producedTypeInfo,
+            @Nullable String database,
+            @Nullable String table,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat) {
+        final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata);
+        this.jsonDeserializer =
+                new JsonRowDataDeserializationSchema(
+                        jsonRowType,
+                        // the result type is never used, so it's fine to pass in the produced type
+                        // info
+                        producedTypeInfo,
+                        false, // ignoreParseErrors already contains the functionality of
+                        // failOnMissingField
+                        ignoreParseErrors,
+                        timestampFormat);
+        this.hasMetadata = requestedMetadata.size() > 0;
+        this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
+        this.producedTypeInfo = producedTypeInfo;
+        this.database = database;
+        this.table = table;
+        this.ignoreParseErrors = ignoreParseErrors;
+        final RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
+        this.fieldNames = physicalRowType.getFieldNames();
+        this.fieldCount = physicalRowType.getFieldCount();
+        this.databasePattern = database == null ? null : Pattern.compile(database);
+        this.tablePattern = table == null ? null : Pattern.compile(table);
+    }
+
+    // ------------------------------------------------------------------------------------------
+    // Builder
+    // ------------------------------------------------------------------------------------------
+
+    /** Creates A builder for building a {@link CanalJsonEnhancedDeserializationSchema}. */
+    public static Builder builder(
+            DataType physicalDataType,
+            List<ReadableMetadata> requestedMetadata,
+            TypeInformation<RowData> producedTypeInfo) {
+        return new Builder(physicalDataType, requestedMetadata, producedTypeInfo);
+    }
+
+    /** A builder for creating a {@link CanalJsonEnhancedDeserializationSchema}. */
+    @Internal
+    public static final class Builder {
+        private final DataType physicalDataType;
+        private final List<ReadableMetadata> requestedMetadata;
+        private final TypeInformation<RowData> producedTypeInfo;
+        private String database = null;
+        private String table = null;
+        private boolean ignoreParseErrors = false;
+        private TimestampFormat timestampFormat = TimestampFormat.SQL;
+
+        private Builder(
+                DataType physicalDataType,
+                List<ReadableMetadata> requestedMetadata,
+                TypeInformation<RowData> producedTypeInfo) {
+            this.physicalDataType = physicalDataType;
+            this.requestedMetadata = requestedMetadata;
+            this.producedTypeInfo = producedTypeInfo;
+        }
+
+        public Builder setDatabase(String database) {
+            this.database = database;
+            return this;
+        }
+
+        public Builder setTable(String table) {
+            this.table = table;
+            return this;
+        }
+
+        public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+            this.ignoreParseErrors = ignoreParseErrors;
+            return this;
+        }
+
+        public Builder setTimestampFormat(TimestampFormat timestampFormat) {
+            this.timestampFormat = timestampFormat;
+            return this;
+        }
+
+        public CanalJsonEnhancedDeserializationSchema build() {
+            return new CanalJsonEnhancedDeserializationSchema(
+                    physicalDataType,
+                    requestedMetadata,
+                    producedTypeInfo,
+                    database,
+                    table,
+                    ignoreParseErrors,
+                    timestampFormat);
+        }
+    }
+
+    // ------------------------------------------------------------------------------------------
+
+    @Override
+    public RowData deserialize(byte[] message) throws IOException {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws IOException {
+        if (message == null || message.length == 0) {
+            return;
+        }
+        try {
+            final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
+            if (database != null) {
+                if (!databasePattern
+                        .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
+                        .matches()) {
+                    return;
+                }
+            }
+            if (table != null) {
+                if (!tablePattern
+                        .matcher(root.get(ReadableMetadata.TABLE.key).asText())
+                        .matches()) {
+                    return;
+                }
+            }
+            final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
+            String type = row.getString(2).toString(); // "type" field
+            if (OP_INSERT.equals(type)) {
+                // "data" field is an array of row, contains inserted rows
+                ArrayData data = row.getArray(0);
+                for (int i = 0; i < data.size(); i++) {
+                    GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
+                    insert.setRowKind(RowKind.INSERT);
+                    emitRow(row, insert, out);
+                }
+            } else if (OP_UPDATE.equals(type)) {
+                // "data" field is an array of row, contains new rows
+                ArrayData data = row.getArray(0);
+                // "old" field is an array of row, contains old values
+                ArrayData old = row.getArray(1);
+                for (int i = 0; i < data.size(); i++) {
+                    // the underlying JSON deserialization schema always produce GenericRowData.
+                    GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
+                    GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
+                    final JsonNode oldField = root.get(FIELD_OLD);
+                    for (int f = 0; f < fieldCount; f++) {
+                        if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) {
+                            // fields in "old" (before) means the fields are changed
+                            // fields not in "old" (before) means the fields are not changed
+                            // so we just copy the not changed fields into before
+                            before.setField(f, after.getField(f));
+                        }
+                    }
+                    before.setRowKind(RowKind.UPDATE_BEFORE);
+                    after.setRowKind(RowKind.UPDATE_AFTER);
+                    emitRow(row, before, out);
+                    emitRow(row, after, out);
+                }
+            } else if (OP_DELETE.equals(type)) {
+                // "data" field is an array of row, contains deleted rows
+                ArrayData data = row.getArray(0);
+                for (int i = 0; i < data.size(); i++) {
+                    GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
+                    insert.setRowKind(RowKind.DELETE);
+                    emitRow(row, insert, out);
+                }
+            } else if (OP_CREATE.equals(type)) {
+                // "data" field is null and "type" is "CREATE" which means
+                // this is a DDL change event, and we should skip it.
+                return;
+            } else {
+                if (!ignoreParseErrors) {
+                    throw new IOException(
+                            format(
+                                    "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
+                                    type, new String(message)));
+                }
+            }
+        } catch (Throwable t) {
+            // a big try catch to protect the processing.
+            if (!ignoreParseErrors) {
+                throw new IOException(
+                        format("Corrupt Canal JSON message '%s'.", new String(message)), t);
+            }
+        }
+    }
+
+    private void emitRow(
+            GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
+        // shortcut in case no output projection is required
+        if (!hasMetadata) {
+            out.collect(physicalRow);
+            return;
+        }
+        final int physicalArity = physicalRow.getArity();
+        final int metadataArity = metadataConverters.length;
+        final GenericRowData producedRow =
+                new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+            producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+        }
+        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+            producedRow.setField(
+                    physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+        }
+        out.collect(producedRow);
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CanalJsonEnhancedDeserializationSchema that = (CanalJsonEnhancedDeserializationSchema) o;
+        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
+                && hasMetadata == that.hasMetadata
+                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+                && Objects.equals(database, that.database)
+                && Objects.equals(table, that.table)
+                && ignoreParseErrors == that.ignoreParseErrors
+                && fieldCount == that.fieldCount;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                jsonDeserializer,
+                hasMetadata,
+                producedTypeInfo,
+                database,
+                table,
+                ignoreParseErrors,
+                fieldCount);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static RowType createJsonRowType(
+            DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
+        // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
+        DataType root =
+                DataTypes.ROW(
+                        DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
+                        DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)),
+                        DataTypes.FIELD("type", DataTypes.STRING()),
+                        ReadableMetadata.DATABASE.requiredJsonField,
+                        ReadableMetadata.TABLE.requiredJsonField);
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> rootMetadataFields =
+                readableMetadata.stream()
+                        .filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE)
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
+    }
+
+    private static MetadataConverter[] createMetadataConverters(
+            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+        return requestedMetadata.stream()
+                .map(m -> convert(jsonRowType, m))
+                .toArray(MetadataConverter[]::new);
+    }
+
+    private static MetadataConverter convert(RowType jsonRowType, ReadableMetadata metadata) {
+        final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+        return new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(GenericRowData root, int unused) {
+                return metadata.converter.convert(root, pos);
+            }
+        };
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Converter that extracts a metadata field from the row that comes out of the JSON schema and
+     * converts it to the desired data type.
+     */
+    interface MetadataConverter extends Serializable {
+
+        // Method for top-level access.
+        default Object convert(GenericRowData row) {
+            return convert(row, -1);
+        }
+
+        Object convert(GenericRowData row, int pos);
+    }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
new file mode 100644
index 000000000..a74d9ecf0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedSerializationSchema.MetadataConverter;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * {@link org.apache.flink.table.connector.format.EncodingFormat} for Canal using JSON encoding.
+ *
+ * different from flink:1.13.5. This can apply metadata, sink metadata into canal format
+ */
+public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
+
+    private List<String> metadataKeys;
+
+    private final TimestampFormat timestampFormat;
+
+    private final JsonOptions.MapNullKeyMode mapNullKeyMode;
+
+    private final String mapNullKeyLiteral;
+
+    private boolean encodeDecimalAsPlainNumber;
+
+    public CanalJsonEnhancedEncodingFormat(
+            TimestampFormat timestampFormat,
+            JsonOptions.MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral,
+            boolean encodeDecimalAsPlainNumber) {
+        this.timestampFormat = timestampFormat;
+        this.mapNullKeyMode = mapNullKeyMode;
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public SerializationSchema<RowData> createRuntimeEncoder(Context context, DataType physicalDataType) {
+        final List<WriteableMetadata> writeableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(WriteableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                .orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        return new CanalJsonEnhancedSerializationSchema(
+                physicalDataType,
+                writeableMetadata,
+                timestampFormat,
+                mapNullKeyMode,
+                mapNullKeyLiteral,
+                encodeDecimalAsPlainNumber);
+    }
+
+    @Override
+    public Map<String, DataType> listWritableMetadata() {
+        return Arrays.stream(WriteableMetadata.values())
+                .collect(
+                        Collectors.toMap(m -> m.key, m -> m.dataType));
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * List of metadata that can write into this format.
+     * canal json inner data type
+     */
+    enum WriteableMetadata {
+        DATABASE(
+                "database",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("database", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getString(pos);
+                    }
+                }),
+        TABLE(
+                "table",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("table", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getString(pos);
+                    }
+                }),
+        SQL_TYPE(
+                "sql-type",
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
+                DataTypes.FIELD(
+                        "sqlType",
+                        DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getMap(pos);
+                    }
+                }),
+        PK_NAMES(
+                "pk-names",
+                DataTypes.ARRAY(DataTypes.STRING()).nullable(),
+                DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getArray(pos);
+                    }
+                }),
+        INGESTION_TIMESTAMP(
+                "ingestion-timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.FIELD("ts", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getTimestamp(pos, 3).getMillisecond();
+                    }
+                }),
+        EVENT_TIMESTAMP(
+                "event-timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.FIELD("es", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getTimestamp(pos, 3).getMillisecond();
+                    }
+                }),
+        // additional metadata
+        OP_TYPE(
+                "op-type",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("opType", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getString(pos);
+                    }
+                }),
+        IS_DDL(
+                "is-ddl",
+                DataTypes.BOOLEAN().nullable(),
+                DataTypes.FIELD("isDdl", DataTypes.BOOLEAN()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getBoolean(pos);
+                    }
+                }),
+
+        MYSQL_TYPE(
+                "mysql-type",
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable(),
+                DataTypes.FIELD("mysqlType", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getMap(pos);
+                    }
+                }),
+        BATCH_ID(
+                "batch-id",
+                DataTypes.BIGINT().nullable(),
+                DataTypes.FIELD("batchId", DataTypes.BIGINT()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getLong(pos);
+                    }
+                }),
+        UPDATE_BEFORE(
+                "update-before",
+                DataTypes.ARRAY(
+                        DataTypes.MAP(
+                                DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).nullable())
+                        .nullable(),
+                DataTypes.FIELD("updateBefore", DataTypes.ARRAY(
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        return row.getArray(pos);
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final DataTypes.Field requiredJsonField;
+
+        final MetadataConverter converter;
+
+        WriteableMetadata(
+                String key,
+                DataType dataType,
+                DataTypes.Field requiredJsonField,
+                MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.requiredJsonField = requiredJsonField;
+            this.converter = converter;
+        }
+    }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactory.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactory.java
new file mode 100644
index 000000000..f43d9cea7
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.formats.json.JsonOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.DATABASE_INCLUDE;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.IGNORE_PARSE_ERRORS;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.JSON_MAP_NULL_KEY_LITERAL;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.JSON_MAP_NULL_KEY_MODE;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.TABLE_INCLUDE;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.validateDecodingFormatOptions;
+import static org.apache.flink.formats.json.canal.CanalJsonOptions.validateEncodingFormatOptions;
+
+/**
+ * Format factory for providing configured instances of Canal JSON to RowData {@link
+ * DeserializationSchema}.
+ * Different from flink:1.13.5.This can sink metadata.
+ */
+public class CanalJsonEnhancedFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "canal-json-inlong";
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        validateDecodingFormatOptions(formatOptions);
+
+        final String database = formatOptions.getOptional(DATABASE_INCLUDE).orElse(null);
+        final String table = formatOptions.getOptional(TABLE_INCLUDE).orElse(null);
+        final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+        final TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
+
+        return new CanalJsonEnhancedDecodingFormat(database, table, ignoreParseErrors, timestampFormat);
+    }
+
+    @Override
+    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
+            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        validateEncodingFormatOptions(formatOptions);
+
+        TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
+        JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
+        String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
+
+        final boolean encodeDecimalAsPlainNumber =
+                formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+
+        return new CanalJsonEnhancedEncodingFormat(timestampFormat, mapNullKeyMode,
+                mapNullKeyLiteral, encodeDecimalAsPlainNumber);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(IGNORE_PARSE_ERRORS);
+        options.add(TIMESTAMP_FORMAT);
+        options.add(DATABASE_INCLUDE);
+        options.add(TABLE_INCLUDE);
+        options.add(JSON_MAP_NULL_KEY_MODE);
+        options.add(JSON_MAP_NULL_KEY_LITERAL);
+        options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+        return options;
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
new file mode 100644
index 000000000..c13c3f23d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedEncodingFormat.WriteableMetadata;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Serialization schema that serializes an object of Flink Table/SQL internal data structure {@link
+ * RowData} into a Canal JSON bytes.
+ * Different from flink:1.13.5.This can write metadata.
+ *
+ * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
+ */
+public class CanalJsonEnhancedSerializationSchema implements SerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final StringData OP_INSERT = StringData.fromString("INSERT");
+    private static final StringData OP_DELETE = StringData.fromString("DELETE");
+
+    private transient GenericRowData reuse;
+
+    /** The serializer to serialize Canal JSON data. */
+    private final JsonRowDataSerializationSchema jsonSerializer;
+
+    private final RowData.FieldGetter[] physicalFieldGetter;
+
+    private final RowData.FieldGetter[] wirteableMetadataFieldGetter;
+
+    /** row schema that json serializer can parse output row to json format */
+    private final RowType jsonRowType;
+
+    public CanalJsonEnhancedSerializationSchema(
+            DataType physicalDataType,
+            List<WriteableMetadata> writeableMetadata,
+            TimestampFormat timestampFormat,
+            JsonOptions.MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral,
+            boolean encodeDecimalAsPlainNumber) {
+        final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+        this.jsonRowType = createJsonRowType(physicalDataType, writeableMetadata);
+        this.physicalFieldGetter = IntStream.range(0, physicalChildren.size())
+                .mapToObj(targetField ->
+                        RowData.createFieldGetter(physicalChildren.get(targetField), targetField))
+                .toArray(RowData.FieldGetter[]::new);
+        this.wirteableMetadataFieldGetter =
+                IntStream.range(physicalChildren.size(), physicalChildren.size() + writeableMetadata.size())
+                        .mapToObj(targetField -> new RowData.FieldGetter() {
+                            @Nullable
+                            @Override
+                            public Object getFieldOrNull(RowData row) {
+                                WriteableMetadata curWriteableMetadata = writeableMetadata
+                                        .get(targetField - physicalChildren.size());
+                                return curWriteableMetadata.converter.convert(row, targetField);
+                            }
+                        }).toArray(RowData.FieldGetter[]::new);
+
+        this.jsonSerializer =
+                new JsonRowDataSerializationSchema(
+                        jsonRowType,
+                        timestampFormat,
+                        mapNullKeyMode,
+                        mapNullKeyLiteral,
+                        encodeDecimalAsPlainNumber);
+    }
+
+    @Override
+    public void open(InitializationContext context) {
+        reuse = new GenericRowData(2 + wirteableMetadataFieldGetter.length);
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        try {
+            // physical data injection
+            GenericRowData physicalData = new GenericRowData(physicalFieldGetter.length);
+            IntStream.range(0, physicalFieldGetter.length)
+                    .forEach(targetField ->
+                            physicalData.setField(targetField, physicalFieldGetter[targetField].getFieldOrNull(row)));
+            ArrayData arrayData = new GenericArrayData(new RowData[] {physicalData});
+            reuse.setField(0, arrayData);
+
+            // mete data injection
+            StringData opType = rowKind2String(row.getRowKind());
+            reuse.setField(1, opType);
+            IntStream.range(0, wirteableMetadataFieldGetter.length)
+                    .forEach(targetField ->
+                            reuse.setField(2 + targetField,
+                                    wirteableMetadataFieldGetter[targetField].getFieldOrNull(row)));
+            return jsonSerializer.serialize(reuse);
+        } catch (Throwable t) {
+            throw new RuntimeException("Could not serialize row '" + row + "'.", t);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CanalJsonEnhancedSerializationSchema that = (CanalJsonEnhancedSerializationSchema) o;
+        return Objects.equals(jsonSerializer, that.jsonSerializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jsonSerializer);
+    }
+
+    private StringData rowKind2String(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return OP_INSERT;
+            case UPDATE_BEFORE:
+            case DELETE:
+                return OP_DELETE;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported operation '" + rowKind + "' for row kind.");
+        }
+    }
+
+    private static RowType createJsonRowType(DataType physicalDataType, List<WriteableMetadata> writeableMetadata) {
+        // Canal JSON contains other information, e.g. "database", "ts"
+        // but we don't need them
+        // and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER
+        DataType root =
+                DataTypes.ROW(
+                                DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
+                                DataTypes.FIELD("type", DataTypes.STRING()));
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> metadataFields =
+                writeableMetadata.stream()
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType();
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Converter that load a metadata field from the row that comes out of the input RowData.
+     * Finally all metadata field will splice into a GenericRowData, then json Serializer serialize it into json string.
+     */
+    interface MetadataConverter extends Serializable {
+        Object convert(RowData inputRow, int pos);
+    }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..c728ddd5c
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedFormatFactory
diff --git a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java
new file mode 100644
index 000000000..97375283e
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedFormatFactoryTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+public class CanalJsonEnhancedFormatFactoryTest {
+    private static final InternalTypeInfo<RowData> ROW_TYPE_INFO =
+            InternalTypeInfo.of(PHYSICAL_TYPE);
+
+    @Test
+    public void testUserDefinedOptions() {
+        final Map<String, String> tableOptions =
+                getModifiedOptions(opts -> {
+                    opts.put("canal-json-inlong.map-null-key.mode", "LITERAL");
+                    opts.put("canal-json-inlong.map-null-key.literal", "nullKey");
+                    opts.put("canal-json-inlong.ignore-parse-errors", "true");
+                    opts.put("canal-json-inlong.timestamp-format.standard", "ISO-8601");
+                    opts.put("canal-json-inlong.database.include", "mydb");
+                    opts.put("canal-json-inlong.table.include", "mytable");
+                    opts.put("canal-json-inlong.map-null-key.mode", "LITERAL");
+                    opts.put("canal-json-inlong.map-null-key.literal", "nullKey");
+                    opts.put("canal-json-inlong.encode.decimal-as-plain-number", "true");
+                });
+
+        // test Deser
+        CanalJsonEnhancedDeserializationSchema expectedDeser =
+                CanalJsonEnhancedDeserializationSchema.builder(
+                                PHYSICAL_DATA_TYPE, Collections.emptyList(), ROW_TYPE_INFO)
+                        .setIgnoreParseErrors(true)
+                        .setTimestampFormat(TimestampFormat.ISO_8601)
+                        .setDatabase("mydb")
+                        .setTable("mytable")
+                        .build();
+        DeserializationSchema<RowData> actualDeser = createDeserializationSchema(tableOptions);
+        assertEquals(expectedDeser, actualDeser);
+
+        // test Ser
+        CanalJsonEnhancedSerializationSchema expectedSer =
+                new CanalJsonEnhancedSerializationSchema(
+                        PHYSICAL_DATA_TYPE,
+                        new ArrayList<>(),
+                        TimestampFormat.ISO_8601,
+                        JsonOptions.MapNullKeyMode.LITERAL,
+                        "nullKey",
+                        true);
+        SerializationSchema<RowData> actualSer = createSerializationSchema(tableOptions);
+        assertEquals(expectedSer, actualSer);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Public Tools
+    // ------------------------------------------------------------------------
+
+    public static DeserializationSchema<RowData> createDeserializationSchema(
+            Map<String, String> options) {
+        DynamicTableSource source = createTableSource(SCHEMA, options);
+
+        assert source instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+        TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                (TestDynamicTableFactory.DynamicTableSourceMock) source;
+
+        return scanSourceMock.valueFormat.createRuntimeDecoder(
+                ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
+    }
+
+    public static SerializationSchema<RowData> createSerializationSchema(
+            Map<String, String> options) {
+        DynamicTableSink sink = createTableSink(SCHEMA, options);
+
+        assert sink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) sink;
+
+        return sinkMock.valueFormat.createRuntimeEncoder(
+                new SinkRuntimeProviderContext(false), PHYSICAL_DATA_TYPE);
+    }
+
+    /**
+     * Returns the full options modified by the given consumer {@code optionModifier}.
+     *
+     * @param optionModifier Consumer to modify the options
+     */
+    public static Map<String, String> getModifiedOptions(Consumer<Map<String, String>> optionModifier) {
+        Map<String, String> options = getAllOptions();
+        optionModifier.accept(options);
+        return options;
+    }
+
+    private static Map<String, String> getAllOptions() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+        options.put("target", "MyTarget");
+        options.put("buffer-size", "1000");
+        options.put("format", "canal-json-inlong");
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerDeSchemaTest.java b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerDeSchemaTest.java
new file mode 100644
index 000000000..e04bd46f9
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerDeSchemaTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.canal;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions;
+import org.apache.flink.formats.json.canal.CanalJsonOptions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata;
+import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedEncodingFormat.WriteableMetadata;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+public class CanalJsonEnhancedSerDeSchemaTest {
+    public static final String DATABASE = "TEST";
+
+    public static final String TABLE = "TEST";
+
+    public static final ResolvedSchema SCHEMA =
+            ResolvedSchema.of(
+                    Column.metadata("database", DataTypes.BOOLEAN(), "database", false),
+                    Column.physical("id", DataTypes.BIGINT()),
+                    Column.physical("name", DataTypes.STRING()),
+                    Column.metadata("table", DataTypes.BOOLEAN(), "table", false),
+                    Column.metadata("sql_type",
+                            DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), "sql-type", false),
+                    Column.metadata("pk_names",
+                            DataTypes.ARRAY(DataTypes.STRING()), "pk-names", false),
+                    Column.metadata("ingestion_timestamp",
+                            DataTypes.TIMESTAMP_LTZ(3), "ingestion-timestamp", false),
+                    Column.metadata("event_timestamp",
+                            DataTypes.TIMESTAMP_LTZ(3), "event-timestamp", false),
+                    Column.metadata("op_type", DataTypes.STRING(), "op-type", false),
+                    Column.metadata("is_ddl", DataTypes.BOOLEAN(), "is-ddl", false),
+                    Column.metadata("mysql_type",
+                            DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), "mysql-type", false),
+                    Column.metadata("batch_id", DataTypes.BIGINT(), "batch-id", false),
+                    Column.metadata("update_before",
+                            DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+                            "update-before", false));
+
+    public static final DataType PHYSICAL_DATA_TYPE = SCHEMA.toPhysicalRowDataType();
+
+    public static final List<ReadableMetadata> READABLE_METADATA =
+            Stream.of(
+                    ReadableMetadata.DATABASE,
+                    ReadableMetadata.TABLE,
+                    ReadableMetadata.SQL_TYPE,
+                    ReadableMetadata.PK_NAMES,
+                    ReadableMetadata.INGESTION_TIMESTAMP,
+                    ReadableMetadata.EVENT_TIMESTAMP,
+                    ReadableMetadata.OP_TYPE,
+                    ReadableMetadata.IS_DDL,
+                    ReadableMetadata.MYSQL_TYPE,
+                    ReadableMetadata.BATCH_ID,
+                    ReadableMetadata.UPDATE_BEFORE
+            ).collect(Collectors.toList());
+
+    public static final List<WriteableMetadata> WRITEABLE_METADATA =
+            Stream.of(
+                    WriteableMetadata.DATABASE,
+                    WriteableMetadata.TABLE,
+                    WriteableMetadata.SQL_TYPE,
+                    WriteableMetadata.PK_NAMES,
+                    WriteableMetadata.INGESTION_TIMESTAMP,
+                    WriteableMetadata.EVENT_TIMESTAMP,
+                    WriteableMetadata.OP_TYPE,
+                    WriteableMetadata.IS_DDL,
+                    WriteableMetadata.MYSQL_TYPE,
+                    WriteableMetadata.BATCH_ID,
+                    WriteableMetadata.UPDATE_BEFORE
+            ).collect(Collectors.toList());
+
+    @Test
+    public void testSerDeWithMetadata() throws Exception {
+        List<String> lines = readLines("canal-json-inlong-data.txt");
+        DeserializationSchema<RowData> deserializationSchema = createCanalJsonDeserializationSchema(
+                PHYSICAL_DATA_TYPE, READABLE_METADATA);
+        // deserialize
+        SimpleCollector out = new SimpleCollector();
+        for (String line : lines) {
+            deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), out);
+        }
+        List<RowData> res = out.result();
+
+        // serialize
+        SerializationSchema<RowData> serializationSchema = createCanalJsonSerializationSchema(
+                PHYSICAL_DATA_TYPE, WRITEABLE_METADATA);
+        serializationSchema.open(null);
+        for (int i = 0; i < lines.size(); i++) {
+            String json = new String(serializationSchema.serialize(res.get(i)), StandardCharsets.UTF_8);
+            compareJson(json, lines.get(i));
+        }
+    }
+
+    // =======================================Utils=======================================================
+
+    private CanalJsonEnhancedDeserializationSchema createCanalJsonDeserializationSchema(
+            DataType physicalDataType, List<ReadableMetadata> requestedMetadata) {
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(
+                        physicalDataType,
+                        requestedMetadata.stream()
+                                .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                                .collect(Collectors.toList()));
+        return CanalJsonEnhancedDeserializationSchema.builder(
+                        PHYSICAL_DATA_TYPE,
+                        requestedMetadata,
+                        InternalTypeInfo.of(producedDataType.getLogicalType()))
+                .setDatabase(DATABASE)
+                .setTable(TABLE)
+                .setIgnoreParseErrors(JsonOptions.IGNORE_PARSE_ERRORS.defaultValue())
+                .setTimestampFormat(TimestampFormat.valueOf(CanalJsonOptions.TIMESTAMP_FORMAT.defaultValue()))
+                .build();
+    }
+
+    private CanalJsonEnhancedSerializationSchema createCanalJsonSerializationSchema(
+            DataType physicalDataType, List<WriteableMetadata> requestedMetadata) {
+        return new CanalJsonEnhancedSerializationSchema(
+                physicalDataType,
+                requestedMetadata,
+                TimestampFormat.valueOf(CanalJsonOptions.TIMESTAMP_FORMAT.defaultValue()),
+                JsonOptions.MapNullKeyMode.valueOf(CanalJsonOptions.JSON_MAP_NULL_KEY_MODE.defaultValue()),
+                CanalJsonOptions.JSON_MAP_NULL_KEY_LITERAL.defaultValue(),
+                JsonOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER.defaultValue());
+    }
+
+    private static List<String> readLines(String resource) throws IOException {
+        final URL url = CanalJsonEnhancedSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;
+        Path path = new File(url.getFile()).toPath();
+        return Files.readAllLines(path);
+    }
+
+    private static List<RowData> readRowDatas(String resource) throws IOException, ClassNotFoundException {
+        final URL url = CanalJsonEnhancedSerDeSchemaTest.class.getClassLoader().getResource(resource);
+        assert url != null;
+        Path path = new File(url.getFile()).toPath();
+        ObjectInputStream in = new ObjectInputStream(new FileInputStream(path.toString()));
+        return (List<RowData>)in.readObject();
+    }
+
+    public void compareJson(String json1, String json2) throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode node1 = objectMapper.readTree(json1);
+        JsonNode node2 = objectMapper.readTree(json2);
+        assertEquals(node1, node2);
+    }
+
+    private static class SimpleCollector implements Collector<RowData> {
+
+        private List<RowData> list = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            list.add(record);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        public List<RowData> result() {
+            List<RowData> newList = new ArrayList<>();
+            list.forEach(row -> newList.add(row));
+            list.clear();
+            return newList;
+        }
+    }
+
+}
diff --git a/inlong-sort/sort-formats/format-json/src/test/resources/canal-json-inlong-data.txt b/inlong-sort/sort-formats/format-json/src/test/resources/canal-json-inlong-data.txt
new file mode 100644
index 000000000..0181fa17b
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/resources/canal-json-inlong-data.txt
@@ -0,0 +1,3 @@
+{"data":[{"id":2,"name":"xixi"}],"type":"INSERT","pkNames":["id"],"database":"TEST","mysqlType":{"name":"VARCHAR(63)","id":"BIGINT(20)"},"opType":"INSERT","es":0,"batchId":0,"sqlType":{"name":12,"id":-5},"updateBefore":null,"ts":1651208731718,"isDdl":false,"table":"TEST"}
+{"data":[{"id":1,"name":"oooooo"}],"type":"INSERT","pkNames":["id"],"database":"TEST","mysqlType":{"name":"VARCHAR(63)","id":"BIGINT(20)"},"opType":"INSERT","es":0,"batchId":1,"sqlType":{"name":12,"id":-5},"updateBefore":null,"ts":1651208731717,"isDdl":false,"table":"TEST"}
+{"data":[{"id":3,"name":"HAHA"}],"type":"INSERT","pkNames":["id"],"database":"TEST","mysqlType":{"name":"VARCHAR(63)","id":"BIGINT(20)"},"opType":"INSERT","es":1651208797000,"batchId":2,"sqlType":{"name":12,"id":-5},"updateBefore":null,"ts":1651208799752,"isDdl":false,"table":"TEST"}
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-json/src/test/resources/log4j-test.properties
new file mode 100644
index 000000000..a4640f721
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/test/resources/log4j-test.properties
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=INFO, testlogger
+# testlogger is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-4r [%t] %-5p %c %x - %m%n
+# Suppress the irrelevant (wrong) warnings from the Netty channel handler
+log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+# Resource leak detector only works with logging enabled at error level
+log4j.logger.org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector=ERROR, testlogger
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 648bb9c84..d95ff8439 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -130,6 +130,20 @@
             <scope>test</scope>
         </dependency>
 
+        <!--flink dependency-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 726fed249..ec69e894c 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -123,11 +123,6 @@
         </dependency>
 
         <!--flink-->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-kafka_${flink.scala.binary.version}</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 7f3d5b23e..948f08868 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -63,7 +63,7 @@ import java.io.IOException;
 import java.util.Map;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
 import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuilder.buildPulsarSource;
 import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuilder.buildTDMQPulsarSource;
 
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index c8294ebd9..e481b09ff 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -663,35 +663,33 @@ public class FlinkSqlParser implements Parser {
             case MYSQL_METADATA_DATABASE:
                 metaType = "STRING METADATA FROM 'value.database'";
                 break;
-            case MYSQL_METADATA_EVENT_TIME:
-                metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'";
+            case METADATA_SQL_TYPE:
+                metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
                 break;
-            case MYSQL_METADATA_EVENT_TYPE:
-                metaType = "STRING METADATA FROM 'value.op_type'";
+            case METADATA_PK_NAMES:
+                metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
                 break;
-            case MYSQL_METADATA_DATA:
-                metaType = "STRING METADATA FROM 'value.data'";
+            case METADATA_TS:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
                 break;
-            case MYSQL_METADATA_IS_DDL:
-                metaType = "BOOLEAN METADATA FROM 'value.is_ddl'";
+            case MYSQL_METADATA_EVENT_TIME:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'";
                 break;
-            case METADATA_TS:
-                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'";
+            // additional metadata
+            case MYSQL_METADATA_EVENT_TYPE:
+                metaType = "STRING METADATA FROM 'value.op-type'";
                 break;
-            case METADATA_SQL_TYPE:
-                metaType = "MAP<STRING, INT> METADATA FROM 'value.sql_type'";
+            case MYSQL_METADATA_IS_DDL:
+                metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
                 break;
             case METADATA_MYSQL_TYPE:
-                metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql_type'";
-                break;
-            case METADATA_PK_NAMES:
-                metaType = "ARRAY<STRING> METADATA FROM 'value.pk_names'";
+                metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
                 break;
             case METADATA_BATCH_ID:
-                metaType = "BIGINT METADATA FROM 'value.batch_id'";
+                metaType = "BIGINT METADATA FROM 'value.batch-id'";
                 break;
             case METADATA_UPDATE_BEFORE:
-                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before'";
+                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
                 break;
             default:
                 metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/KafkaSinkTestBase.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/KafkaSinkTestBase.java
index 1b9834669..fcd0279ff 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/KafkaSinkTestBase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -64,7 +64,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
-import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
+import static org.apache.inlong.sort.flink.kafka.KafkaSinkBuilder.buildKafkaSink;
 import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort;
 import static org.junit.Assert.assertNull;
 
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToAvroKafkaSinkTest.java
similarity index 99%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToAvroKafkaSinkTest.java
index 2a084170c..634cf489a 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToAvroKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToAvroKafkaSinkTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToCanalKafkaSinkTest.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToCanalKafkaSinkTest.java
index ebc8e062e..6e5cc6965 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToCanalKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToCanalKafkaSinkTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
index c587acbf6..085585d41 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToDebeziumJsonKafkaSinkTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToJsonKafkaSinkTest.java
similarity index 98%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToJsonKafkaSinkTest.java
index 66c964b8f..4197da0a6 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToJsonKafkaSinkTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToStringKafkaSinkTest.java
similarity index 97%
rename from inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
rename to inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToStringKafkaSinkTest.java
index 999f8b511..46374bf0b 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/flink/kafka/RowToStringKafkaSinkTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.singletenant.flink.kafka;
+package org.apache.inlong.sort.flink.kafka;
 
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index 14ca70205..ece7f5ce3 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -166,5 +166,4 @@ public class FlinkSqlParserTest extends AbstractTestBase {
         FlinkSqlParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
-
 }
diff --git a/pom.xml b/pom.xml
index 722852659..d24341113 100644
--- a/pom.xml
+++ b/pom.xml
@@ -959,6 +959,12 @@
                 <artifactId>flink-connector-mysql-cdc</artifactId>
                 <version>${flink.connector.mysql.cdc.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-test-utils-junit</artifactId>
+                <version>${flink.version}</version>
+                <scope>test</scope>
+            </dependency>
 
             <!-- qcloud -->
             <dependency>