You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/01 05:56:13 UTC

[incubator-inlong] 01/01: Revert "[INLONG-2785][Sort]Support extract metadata from data with debezium format and write them to data with canal format (#2796)"

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch revert-2796-master_debezium_to_canal
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 5db5df127429cfdc0b90c80b9f600ee1fb003589
Author: healchow <he...@gmail.com>
AuthorDate: Tue Mar 1 13:56:10 2022 +0800

    Revert "[INLONG-2785][Sort]Support extract metadata from data with debezium format and write them to data with canal format (#2796)"
    
    This reverts commit 265f129e0bcd26ffe92322876089293f86a44610.
---
 .../inlong/sort/protocol/BuiltInFieldInfo.java     |   7 +-
 inlong-sort/sort-formats/format-json/pom.xml       |  78 ----
 .../inlong/sort/formats/json/MysqlBinLogData.java  |  60 ---
 .../json/canal/CanalJsonDecodingFormat.java        | 293 ---------------
 .../json/canal/CanalJsonDeserializationSchema.java | 411 ---------------------
 .../json/canal/CanalJsonSerializationSchema.java   | 195 ----------
 .../json/debezium/DebeziumJsonDecodingFormat.java  | 286 --------------
 .../DebeziumJsonDeserializationSchema.java         | 366 ------------------
 .../sort/formats/json/debezium/DebeziumUtils.java  |  37 --
 inlong-sort/sort-formats/pom.xml                   |   1 -
 inlong-sort/sort-single-tenant/pom.xml             |  11 +-
 .../inlong/sort/singletenant/flink/Entrance.java   |  16 +-
 .../DebeziumDeserializationSchemaBuilder.java      |  93 +----
 .../deserialization/DeserializationFunction.java   |  17 +-
 .../DeserializationSchemaFactory.java              |  14 +-
 .../deserialization/FieldMappingTransformer.java   |  30 +-
 .../CanalSerializationSchemaBuilder.java           |  70 +---
 .../sort/singletenant/flink/utils/CommonUtils.java |   8 -
 .../singletenant/flink/DebeziumToCanalITCase.java  | 208 -----------
 .../DebeziumDeserializationTest.java               | 111 +++---
 .../FieldMappingTransformerTest.java               |   3 +-
 .../serialization/CanalSerializationTest.java      |  83 -----
 22 files changed, 103 insertions(+), 2295 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index 6a25d62..45291fa 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -27,12 +27,7 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 public class BuiltInFieldInfo extends FieldInfo {
 
     public enum BuiltInField {
-        DATA_TIME,
-        MYSQL_METADATA_DATABASE,
-        MYSQL_METADATA_TABLE,
-        MYSQL_METADATA_EVENT_TIME,
-        MYSQL_METADATA_IS_DDL,
-        MYSQL_METADATA_EVENT_TYPE
+        DATA_TIME
     }
 
     private static final long serialVersionUID = -3436204467879205139L;
diff --git a/inlong-sort/sort-formats/format-json/pom.xml b/inlong-sort/sort-formats/format-json/pom.xml
deleted file mode 100644
index ab4db83..0000000
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <artifactId>sort-formats</artifactId>
-        <groupId>org.apache.inlong</groupId>
-        <version>1.1.0-incubating-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-
-    <artifactId>sort-format-json</artifactId>
-    <name>Apache InLong - Sort Format-json</name>
-    <packaging>jar</packaging>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-common</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-shaded-jackson</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-json</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-    </dependencies>
-
-</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/MysqlBinLogData.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/MysqlBinLogData.java
deleted file mode 100644
index eaaa9ea..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/MysqlBinLogData.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.flink.table.data.RowData;
-
-public class MysqlBinLogData implements Serializable {
-
-    public static final String MYSQL_METADATA_DATABASE = "mysql_metadata_database";
-
-    public static final String MYSQL_METADATA_TABLE = "mysql_metadata_table";
-
-    public static final String MYSQL_METADATA_EVENT_TIME = "mysql_metadata_event_time";
-
-    public static final String MYSQL_METADATA_IS_DDL = "mysql_metadata_is_ddl";
-
-    public static final String MYSQL_METADATA_EVENT_TYPE = "mysql_metadata_event_type";
-
-    private RowData physicalData;
-
-    private Map<String, Object> metadataMap;
-
-    public MysqlBinLogData(RowData physicalData, Map<String, Object> metadataMap) {
-        this.physicalData = physicalData;
-        this.metadataMap = metadataMap;
-    }
-
-    public RowData getPhysicalData() {
-        return physicalData;
-    }
-
-    public void setPhysicalData(RowData physicalData) {
-        this.physicalData = physicalData;
-    }
-
-    public Map<String, Object> getMetadataMap() {
-        return metadataMap;
-    }
-
-    public void setMetadataMap(Map<String, Object> metadataMap) {
-        this.metadataMap = metadataMap;
-    }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
deleted file mode 100644
index 343f4e9..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDecodingFormat.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.canal;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDeserializationSchema.MetadataConverter;
-
-/** {@link DecodingFormat} for Canal using JSON encoding. */
-public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
-
-    // --------------------------------------------------------------------------------------------
-    // Mutable attributes
-    // --------------------------------------------------------------------------------------------
-
-    private List<String> metadataKeys;
-
-    // --------------------------------------------------------------------------------------------
-    // Canal-specific attributes
-    // --------------------------------------------------------------------------------------------
-
-    private final @Nullable String database;
-
-    private final @Nullable String table;
-
-    private final boolean ignoreParseErrors;
-
-    private final TimestampFormat timestampFormat;
-
-    public CanalJsonDecodingFormat(
-            String database,
-            String table,
-            boolean ignoreParseErrors,
-            TimestampFormat timestampFormat) {
-        this.database = database;
-        this.table = table;
-        this.ignoreParseErrors = ignoreParseErrors;
-        this.timestampFormat = timestampFormat;
-        this.metadataKeys = Collections.emptyList();
-    }
-
-    @Override
-    public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
-        final List<ReadableMetadata> readableMetadata =
-                metadataKeys.stream()
-                        .map(
-                                k ->
-                                        Stream.of(ReadableMetadata.values())
-                                                .filter(rm -> rm.key.equals(k))
-                                                .findFirst()
-                                                .orElseThrow(IllegalStateException::new))
-                        .collect(Collectors.toList());
-        final List<DataTypes.Field> metadataFields =
-                readableMetadata.stream()
-                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
-                        .collect(Collectors.toList());
-        final DataType producedDataType =
-                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
-        final TypeInformation<RowData> producedTypeInfo =
-                context.createTypeInformation(producedDataType);
-        return CanalJsonDeserializationSchema.builder(
-                        physicalDataType, readableMetadata, producedTypeInfo)
-                .setDatabase(database)
-                .setTable(table)
-                .setIgnoreParseErrors(ignoreParseErrors)
-                .setTimestampFormat(timestampFormat)
-                .build();
-    }
-
-    @Override
-    public Map<String, DataType> listReadableMetadata() {
-        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
-        Stream.of(ReadableMetadata.values())
-                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
-        return metadataMap;
-    }
-
-    @Override
-    public void applyReadableMetadata(List<String> metadataKeys) {
-        this.metadataKeys = metadataKeys;
-    }
-
-    @Override
-    public ChangelogMode getChangelogMode() {
-        return ChangelogMode.newBuilder()
-                .addContainedKind(RowKind.INSERT)
-                .addContainedKind(RowKind.UPDATE_BEFORE)
-                .addContainedKind(RowKind.UPDATE_AFTER)
-                .addContainedKind(RowKind.DELETE)
-                .build();
-    }
-
-    // --------------------------------------------------------------------------------------------
-    // Metadata handling
-    // --------------------------------------------------------------------------------------------
-
-    /** List of metadata that can be read with this format. */
-    public enum ReadableMetadata {
-        DATABASE(
-                "database",
-                DataTypes.STRING().nullable(),
-                DataTypes.FIELD("database", DataTypes.STRING()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getString(pos);
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return StringData.fromString(in.toString());
-                    }
-                }),
-
-        TABLE(
-                "table",
-                DataTypes.STRING().nullable(),
-                DataTypes.FIELD("table", DataTypes.STRING()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getString(pos);
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return StringData.fromString(in.toString());
-                    }
-                }),
-
-        SQL_TYPE(
-                "sql-type",
-                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable()).nullable(),
-                DataTypes.FIELD(
-                        "sqlType",
-                        DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.INT().nullable())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getMap(pos);
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return new GenericMapData((Map<String, Integer>) in);
-                    }
-                }),
-
-        PK_NAMES(
-                "pk-names",
-                DataTypes.ARRAY(DataTypes.STRING()).nullable(),
-                DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getArray(pos);
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return new GenericArrayData((Object[]) in);
-                    }
-                }),
-
-        INGESTION_TIMESTAMP(
-                "ingestion-timestamp",
-                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
-                DataTypes.FIELD("ts", DataTypes.BIGINT()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        if (row.isNullAt(pos)) {
-                            return null;
-                        }
-                        return TimestampData.fromEpochMillis(row.getLong(pos));
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return in;
-                    }
-                }),
-
-        EVENT_TIMESTAMP(
-                "event-timestamp",
-                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
-                DataTypes.FIELD("es", DataTypes.BIGINT()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        if (row.isNullAt(pos)) {
-                            return null;
-                        }
-                        return TimestampData.fromEpochMillis(row.getLong(pos));
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return in;
-                    }
-                }),
-
-        IS_DDL(
-                "is-ddl",
-                DataTypes.BOOLEAN().nullable(),
-                DataTypes.FIELD("isDdl", DataTypes.BOOLEAN()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        if (row.isNullAt(pos)) {
-                            return null;
-                        }
-                        return row.getBoolean(pos);
-                    }
-
-                    @Override
-                    public Object convert(Object in) {
-                        return in;
-                    }
-                });
-
-        final String key;
-
-        final DataType dataType;
-
-        final DataTypes.Field requiredJsonField;
-
-        final MetadataConverter converter;
-
-        ReadableMetadata(
-                String key,
-                DataType dataType,
-                DataTypes.Field requiredJsonField,
-                MetadataConverter converter) {
-            this.key = key;
-            this.dataType = dataType;
-            this.requiredJsonField = requiredJsonField;
-            this.converter = converter;
-        }
-    }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
deleted file mode 100644
index 089debc..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonDeserializationSchema.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.canal;
-
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Collector;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
-
-/**
- * Deserialization schema from Canal JSON to Flink Table/SQL internal data structure {@link
- * RowData}. The deserialization schema knows Canal's schema definition and can extract the database
- * data and convert into {@link RowData} with {@link RowKind}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
- *
- * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
- *
- * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
- */
-public final class CanalJsonDeserializationSchema implements DeserializationSchema<RowData> {
-    private static final long serialVersionUID = 1L;
-
-    private static final String FIELD_OLD = "old";
-    private static final String OP_INSERT = "INSERT";
-    private static final String OP_UPDATE = "UPDATE";
-    private static final String OP_DELETE = "DELETE";
-    private static final String OP_CREATE = "CREATE";
-
-    /** The deserializer to deserialize Canal JSON data. */
-    private final JsonRowDataDeserializationSchema jsonDeserializer;
-
-    /** Flag that indicates that an additional projection is required for metadata. */
-    private final boolean hasMetadata;
-
-    /** Metadata to be extracted for every record. */
-    private final MetadataConverter[] metadataConverters;
-
-    /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
-    private final TypeInformation<RowData> producedTypeInfo;
-
-    /** Only read changelogs from the specific database. */
-    private final @Nullable String database;
-
-    /** Only read changelogs from the specific table. */
-    private final @Nullable String table;
-
-    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
-    private final boolean ignoreParseErrors;
-
-    /** Names of fields. */
-    private final List<String> fieldNames;
-
-    /** Number of fields. */
-    private final int fieldCount;
-
-    /** Pattern of the specific database. */
-    private final Pattern databasePattern;
-
-    /** Pattern of the specific table. */
-    private final Pattern tablePattern;
-
-    private CanalJsonDeserializationSchema(
-            DataType physicalDataType,
-            List<ReadableMetadata> requestedMetadata,
-            TypeInformation<RowData> producedTypeInfo,
-            @Nullable String database,
-            @Nullable String table,
-            boolean ignoreParseErrors,
-            TimestampFormat timestampFormat) {
-        final RowType jsonRowType = createJsonRowType(physicalDataType, requestedMetadata);
-        this.jsonDeserializer =
-                new JsonRowDataDeserializationSchema(
-                        jsonRowType,
-                        // the result type is never used, so it's fine to pass in the produced type
-                        // info
-                        producedTypeInfo,
-                        false, // ignoreParseErrors already contains the functionality of
-                        // failOnMissingField
-                        ignoreParseErrors,
-                        timestampFormat);
-        this.hasMetadata = requestedMetadata.size() > 0;
-        this.metadataConverters = createMetadataConverters(jsonRowType, requestedMetadata);
-        this.producedTypeInfo = producedTypeInfo;
-        this.database = database;
-        this.table = table;
-        this.ignoreParseErrors = ignoreParseErrors;
-        final RowType physicalRowType = ((RowType) physicalDataType.getLogicalType());
-        this.fieldNames = physicalRowType.getFieldNames();
-        this.fieldCount = physicalRowType.getFieldCount();
-        this.databasePattern = database == null ? null : Pattern.compile(database);
-        this.tablePattern = table == null ? null : Pattern.compile(table);
-    }
-
-    // ------------------------------------------------------------------------------------------
-    // Builder
-    // ------------------------------------------------------------------------------------------
-
-    /** Creates A builder for building a {@link CanalJsonDeserializationSchema}. */
-    public static Builder builder(
-            DataType physicalDataType,
-            List<ReadableMetadata> requestedMetadata,
-            TypeInformation<RowData> producedTypeInfo) {
-        return new Builder(physicalDataType, requestedMetadata, producedTypeInfo);
-    }
-
-    /** A builder for creating a {@link CanalJsonDeserializationSchema}. */
-    @Internal
-    public static final class Builder {
-        private final DataType physicalDataType;
-        private final List<ReadableMetadata> requestedMetadata;
-        private final TypeInformation<RowData> producedTypeInfo;
-        private String database = null;
-        private String table = null;
-        private boolean ignoreParseErrors = false;
-        private TimestampFormat timestampFormat = TimestampFormat.SQL;
-
-        private Builder(
-                DataType physicalDataType,
-                List<ReadableMetadata> requestedMetadata,
-                TypeInformation<RowData> producedTypeInfo) {
-            this.physicalDataType = physicalDataType;
-            this.requestedMetadata = requestedMetadata;
-            this.producedTypeInfo = producedTypeInfo;
-        }
-
-        public Builder setDatabase(String database) {
-            this.database = database;
-            return this;
-        }
-
-        public Builder setTable(String table) {
-            this.table = table;
-            return this;
-        }
-
-        public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
-            this.ignoreParseErrors = ignoreParseErrors;
-            return this;
-        }
-
-        public Builder setTimestampFormat(TimestampFormat timestampFormat) {
-            this.timestampFormat = timestampFormat;
-            return this;
-        }
-
-        public CanalJsonDeserializationSchema build() {
-            return new CanalJsonDeserializationSchema(
-                    physicalDataType,
-                    requestedMetadata,
-                    producedTypeInfo,
-                    database,
-                    table,
-                    ignoreParseErrors,
-                    timestampFormat);
-        }
-    }
-
-    // ------------------------------------------------------------------------------------------
-
-    @Override
-    public RowData deserialize(byte[] message) throws IOException {
-        throw new RuntimeException(
-                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
-    }
-
-    @Override
-    public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws IOException {
-        if (message == null || message.length == 0) {
-            return;
-        }
-        try {
-            final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
-            if (database != null) {
-                if (!databasePattern
-                        .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
-                        .matches()) {
-                    return;
-                }
-            }
-            if (table != null) {
-                if (!tablePattern
-                        .matcher(root.get(ReadableMetadata.TABLE.key).asText())
-                        .matches()) {
-                    return;
-                }
-            }
-            final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);
-            String type = row.getString(2).toString(); // "type" field
-            if (OP_INSERT.equals(type)) {
-                // "data" field is an array of row, contains inserted rows
-                ArrayData data = row.getArray(0);
-                for (int i = 0; i < data.size(); i++) {
-                    GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
-                    insert.setRowKind(RowKind.INSERT);
-                    emitRow(row, insert, out);
-                }
-            } else if (OP_UPDATE.equals(type)) {
-                // "data" field is an array of row, contains new rows
-                ArrayData data = row.getArray(0);
-                // "old" field is an array of row, contains old values
-                ArrayData old = row.getArray(1);
-                for (int i = 0; i < data.size(); i++) {
-                    // the underlying JSON deserialization schema always produce GenericRowData.
-                    GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
-                    GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
-                    final JsonNode oldField = root.get(FIELD_OLD);
-                    for (int f = 0; f < fieldCount; f++) {
-                        if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) {
-                            // fields in "old" (before) means the fields are changed
-                            // fields not in "old" (before) means the fields are not changed
-                            // so we just copy the not changed fields into before
-                            before.setField(f, after.getField(f));
-                        }
-                    }
-                    before.setRowKind(RowKind.UPDATE_BEFORE);
-                    after.setRowKind(RowKind.UPDATE_AFTER);
-                    emitRow(row, before, out);
-                    emitRow(row, after, out);
-                }
-            } else if (OP_DELETE.equals(type)) {
-                // "data" field is an array of row, contains deleted rows
-                ArrayData data = row.getArray(0);
-                for (int i = 0; i < data.size(); i++) {
-                    GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount);
-                    insert.setRowKind(RowKind.DELETE);
-                    emitRow(row, insert, out);
-                }
-            } else if (OP_CREATE.equals(type)) {
-                // "data" field is null and "type" is "CREATE" which means
-                // this is a DDL change event, and we should skip it.
-                return;
-            } else {
-                if (!ignoreParseErrors) {
-                    throw new IOException(
-                            format(
-                                    "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
-                                    type, new String(message)));
-                }
-            }
-        } catch (Throwable t) {
-            // a big try catch to protect the processing.
-            if (!ignoreParseErrors) {
-                throw new IOException(
-                        format("Corrupt Canal JSON message '%s'.", new String(message)), t);
-            }
-        }
-    }
-
-    private void emitRow(
-            GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
-        // shortcut in case no output projection is required
-        if (!hasMetadata) {
-            out.collect(physicalRow);
-            return;
-        }
-        final int physicalArity = physicalRow.getArity();
-        final int metadataArity = metadataConverters.length;
-        final GenericRowData producedRow =
-                new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
-        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
-            producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
-        }
-        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
-            producedRow.setField(
-                    physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
-        }
-        out.collect(producedRow);
-    }
-
-    @Override
-    public boolean isEndOfStream(RowData nextElement) {
-        return false;
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return producedTypeInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        CanalJsonDeserializationSchema that = (CanalJsonDeserializationSchema) o;
-        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
-                && hasMetadata == that.hasMetadata
-                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
-                && Objects.equals(database, that.database)
-                && Objects.equals(table, that.table)
-                && ignoreParseErrors == that.ignoreParseErrors
-                && fieldCount == that.fieldCount;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                jsonDeserializer,
-                hasMetadata,
-                producedTypeInfo,
-                database,
-                table,
-                ignoreParseErrors,
-                fieldCount);
-    }
-
-    // --------------------------------------------------------------------------------------------
-
-    private static RowType createJsonRowType(
-            DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
-        // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
-        DataType root =
-                DataTypes.ROW(
-                        DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
-                        DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)),
-                        DataTypes.FIELD("type", DataTypes.STRING()),
-                        ReadableMetadata.DATABASE.requiredJsonField,
-                        ReadableMetadata.TABLE.requiredJsonField);
-        // append fields that are required for reading metadata in the root
-        final List<DataTypes.Field> rootMetadataFields =
-                readableMetadata.stream()
-                        .filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE)
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
-    }
-
-    private static MetadataConverter[] createMetadataConverters(
-            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
-        return requestedMetadata.stream()
-                .map(m -> convert(jsonRowType, m))
-                .toArray(MetadataConverter[]::new);
-    }
-
-    private static MetadataConverter convert(RowType jsonRowType, ReadableMetadata metadata) {
-        final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
-        return new MetadataConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(GenericRowData root, int unused) {
-                return metadata.converter.convert(root, pos);
-            }
-
-            @Override
-            public Object convert(Object in) {
-                return metadata.converter.convert(in);
-            }
-        };
-    }
-
-    // --------------------------------------------------------------------------------------------
-
-    /**
-     * Converter that extracts a metadata field from the row that comes out of the JSON schema and
-     * converts it to the desired data type.
-     */
-    interface MetadataConverter extends Serializable {
-
-        // Method for top-level access.
-        default Object convert(GenericRowData row) {
-            return convert(row, -1);
-        }
-
-        Object convert(GenericRowData row, int pos);
-
-        Object convert(Object in);
-    }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
deleted file mode 100644
index faf1cba..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonSerializationSchema.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.canal;
-
-import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.DataTypes.Field;
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
-
-/**
- * Copied from apache flink project with a litter change.
- *
- * Serialization schema that serializes an object of Flink Table/SQL internal data structure {@link
- * RowData} into a Canal JSON bytes.
- *
- * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a>
- */
-public class CanalJsonSerializationSchema implements SerializationSchema<RowData> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final StringData OP_INSERT = StringData.fromString("INSERT");
-    private static final StringData OP_DELETE = StringData.fromString("DELETE");
-
-    private transient GenericRowData reuse;
-
-    /** The serializer to serialize Canal JSON data. */
-    private final JsonRowDataSerializationSchema jsonSerializer;
-
-    private final DataFormatConverters.RowConverter consumedRowConverter;
-
-    private final DataFormatConverters.RowConverter physicalRowConverter;
-
-    private final Map<Integer, ReadableMetadata> fieldIndexToMetadata;
-
-    public CanalJsonSerializationSchema(
-            RowType physicalRowType,
-            Map<Integer, ReadableMetadata> fieldIndexToMetadata,
-            DataFormatConverters.RowConverter consumedRowConverter,
-            DataFormatConverters.RowConverter physicalRowConverter,
-            TimestampFormat timestampFormat,
-            JsonOptions.MapNullKeyMode mapNullKeyMode,
-            String mapNullKeyLiteral,
-            boolean encodeDecimalAsPlainNumber) {
-        jsonSerializer =
-                new JsonRowDataSerializationSchema(
-                        createJsonRowType(fromLogicalToDataType(physicalRowType), fieldIndexToMetadata.values()),
-                        timestampFormat,
-                        mapNullKeyMode,
-                        mapNullKeyLiteral,
-                        encodeDecimalAsPlainNumber);
-
-        this.fieldIndexToMetadata = fieldIndexToMetadata;
-        this.consumedRowConverter = consumedRowConverter;
-        this.physicalRowConverter = physicalRowConverter;
-    }
-
-    @Override
-    public void open(InitializationContext context) {
-        reuse = new GenericRowData(2 + fieldIndexToMetadata.size());
-    }
-
-    @Override
-    public byte[] serialize(RowData row) {
-        try {
-            MysqlBinLogData mysqlBinLogData = getMysqlBinLongData(row);
-
-            ArrayData arrayData = new GenericArrayData(new RowData[] {mysqlBinLogData.getPhysicalData()});
-            reuse.setField(0, arrayData);
-            reuse.setField(1, rowKind2String(row.getRowKind()));
-
-            // Set metadata
-            Map<String, Object> metadataMap = mysqlBinLogData.getMetadataMap();
-            int index = 2;
-            for (ReadableMetadata readableMetadata : fieldIndexToMetadata.values()) {
-                reuse.setField(index, metadataMap.get(readableMetadata.key));
-                index++;
-            }
-
-            return jsonSerializer.serialize(reuse);
-        } catch (Throwable t) {
-            throw new RuntimeException("Could not serialize row '" + row + "'.", t);
-        }
-    }
-
-    private StringData rowKind2String(RowKind rowKind) {
-        switch (rowKind) {
-            case INSERT:
-            case UPDATE_AFTER:
-                return OP_INSERT;
-            case UPDATE_BEFORE:
-            case DELETE:
-                return OP_DELETE;
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported operation '" + rowKind + "' for row kind.");
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        CanalJsonSerializationSchema that = (CanalJsonSerializationSchema) o;
-        return Objects.equals(jsonSerializer, that.jsonSerializer);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(jsonSerializer);
-    }
-
-    private static RowType createJsonRowType(
-            DataType dataSchema,
-            Collection<ReadableMetadata> metadataSet) {
-        DataType root =  DataTypes.ROW(
-                DataTypes.FIELD("data", DataTypes.ARRAY(dataSchema)),
-                DataTypes.FIELD("type", DataTypes.STRING())
-        );
-
-        final List<Field> metadataFields =
-                metadataSet.stream()
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-
-        return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType();
-    }
-
-    private MysqlBinLogData getMysqlBinLongData(RowData consumedRowData) {
-        Row consumedRow = consumedRowConverter.toExternal(consumedRowData);
-        int consumedRowArity = consumedRow.getArity();
-        Set<Integer> metadataIndices = fieldIndexToMetadata.keySet();
-
-        Row physicalRow = new Row(consumedRowArity - metadataIndices.size());
-        Map<String, Object> metadataMap = new HashMap<>();
-        int physicalRowDataIndex = 0;
-        for (int i = 0; i < consumedRowArity; i++) {
-            if (!metadataIndices.contains(i)) {
-                physicalRow.setField(physicalRowDataIndex, consumedRow.getField(i));
-                physicalRowDataIndex++;
-            } else {
-                metadataMap.put(
-                        fieldIndexToMetadata.get(i).key,
-                        fieldIndexToMetadata.get(i).converter.convert(consumedRow.getField(i)));
-            }
-        }
-
-        return new MysqlBinLogData(physicalRowConverter.toInternal(physicalRow), metadataMap);
-    }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDecodingFormat.java
deleted file mode 100644
index ca51860..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.debezium;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
-
-/**
- * Copied from apache flink project with a litter change.
- *
- * {@link DecodingFormat} for Debezium using JSON encoding.
- **/
-public class DebeziumJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> {
-
-    // --------------------------------------------------------------------------------------------
-    // Mutable attributes
-    // --------------------------------------------------------------------------------------------
-
-    private List<String> metadataKeys;
-
-    // --------------------------------------------------------------------------------------------
-    // Debezium-specific attributes
-    // --------------------------------------------------------------------------------------------
-
-    private final boolean schemaInclude;
-
-    private final boolean ignoreParseErrors;
-
-    private final TimestampFormat timestampFormat;
-
-    public DebeziumJsonDecodingFormat(
-            boolean schemaInclude, boolean ignoreParseErrors, TimestampFormat timestampFormat) {
-        this.schemaInclude = schemaInclude;
-        this.ignoreParseErrors = ignoreParseErrors;
-        this.timestampFormat = timestampFormat;
-        this.metadataKeys = Collections.emptyList();
-    }
-
-    @Override
-    public DeserializationSchema<RowData> createRuntimeDecoder(
-            DynamicTableSource.Context context, DataType physicalDataType) {
-
-        final List<ReadableMetadata> readableMetadata =
-                metadataKeys.stream()
-                        .map(
-                                k ->
-                                        Stream.of(ReadableMetadata.values())
-                                                .filter(rm -> rm.key.equals(k))
-                                                .findFirst()
-                                                .orElseThrow(IllegalStateException::new))
-                        .collect(Collectors.toList());
-
-        final List<DataTypes.Field> metadataFields =
-                readableMetadata.stream()
-                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
-                        .collect(Collectors.toList());
-
-        final DataType producedDataType =
-                DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
-
-        final TypeInformation<RowData> producedTypeInfo =
-                context.createTypeInformation(producedDataType);
-
-        return new DebeziumJsonDeserializationSchema(
-                physicalDataType,
-                readableMetadata,
-                producedTypeInfo,
-                schemaInclude,
-                ignoreParseErrors,
-                timestampFormat);
-    }
-
-    @Override
-    public Map<String, DataType> listReadableMetadata() {
-        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
-        Stream.of(ReadableMetadata.values())
-                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
-        return metadataMap;
-    }
-
-    @Override
-    public void applyReadableMetadata(List<String> metadataKeys) {
-        this.metadataKeys = metadataKeys;
-    }
-
-    @Override
-    public ChangelogMode getChangelogMode() {
-        return ChangelogMode.newBuilder()
-                .addContainedKind(RowKind.INSERT)
-                .addContainedKind(RowKind.UPDATE_BEFORE)
-                .addContainedKind(RowKind.UPDATE_AFTER)
-                .addContainedKind(RowKind.DELETE)
-                .build();
-    }
-
-    // --------------------------------------------------------------------------------------------
-    // Metadata handling
-    // --------------------------------------------------------------------------------------------
-
-    /** List of metadata that can be read with this format. */
-    public enum ReadableMetadata {
-        SCHEMA(
-                "schema",
-                DataTypes.STRING().nullable(),
-                false,
-                DataTypes.FIELD("schema", DataTypes.STRING()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getString(pos);
-                    }
-                }),
-
-        INGESTION_TIMESTAMP(
-                "ingestion-timestamp",
-                DataTypes.BIGINT().nullable(),
-                true,
-                DataTypes.FIELD("ts_ms", DataTypes.BIGINT()),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getLong(pos);
-                    }
-                }),
-
-        SOURCE_TIMESTAMP(
-                "source.timestamp",
-                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        final StringData timestamp =
-                                (StringData) readProperty(row, pos, KEY_SOURCE_TIMESTAMP);
-                        if (timestamp == null) {
-                            return null;
-                        }
-                        return TimestampData.fromEpochMillis(Long.parseLong(timestamp.toString()));
-                    }
-                }),
-
-        SOURCE_DATABASE(
-                "source.database",
-                DataTypes.STRING().nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return readProperty(row, pos, KEY_SOURCE_DATABASE);
-                    }
-                }),
-
-        SOURCE_SCHEMA(
-                "source.schema",
-                DataTypes.STRING().nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return readProperty(row, pos, KEY_SOURCE_SCHEMA);
-                    }
-                }),
-
-        SOURCE_TABLE(
-                "source.table",
-                DataTypes.STRING().nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return readProperty(row, pos, KEY_SOURCE_TABLE);
-                    }
-                }),
-
-        SOURCE_PROPERTIES(
-                "source.properties",
-                // key and value of the map are nullable to make handling easier in queries
-                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable())
-                        .nullable(),
-                true,
-                DataTypes.FIELD("source", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
-                new MetadataConverter() {
-                    private static final long serialVersionUID = 1L;
-
-                    @Override
-                    public Object convert(GenericRowData row, int pos) {
-                        return row.getMap(pos);
-                    }
-                });
-
-        final String key;
-
-        final DataType dataType;
-
-        final boolean isJsonPayload;
-
-        final DataTypes.Field requiredJsonField;
-
-        final MetadataConverter converter;
-
-        ReadableMetadata(
-                String key,
-                DataType dataType,
-                boolean isJsonPayload,
-                DataTypes.Field requiredJsonField,
-                MetadataConverter converter) {
-            this.key = key;
-            this.dataType = dataType;
-            this.isJsonPayload = isJsonPayload;
-            this.requiredJsonField = requiredJsonField;
-            this.converter = converter;
-        }
-
-        public String getKey() {
-            return key;
-        }
-    }
-
-    private static final StringData KEY_SOURCE_TIMESTAMP = StringData.fromString("ts_ms");
-
-    private static final StringData KEY_SOURCE_DATABASE = StringData.fromString("db");
-
-    private static final StringData KEY_SOURCE_SCHEMA = StringData.fromString("schema");
-
-    private static final StringData KEY_SOURCE_TABLE = StringData.fromString("table");
-
-    private static Object readProperty(GenericRowData row, int pos, StringData key) {
-        final GenericMapData map = (GenericMapData) row.getMap(pos);
-        if (map == null) {
-            return null;
-        }
-        return map.get(key);
-    }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
deleted file mode 100644
index 8fea979..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.debezium;
-
-import static java.lang.String.format;
-import static org.apache.inlong.sort.formats.json.debezium.DebeziumUtils.getMysqlMetadataKey;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericMapData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Collector;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-
-/**
- * Copied from apache flink project with a litter change.
- *
- * Deserialization schema from Debezium JSON to Flink Table/SQL internal data structure {@link
- * RowData}. The deserialization schema knows Debezium's schema definition and can extract the
- * database data and convert into {@link RowData} with {@link RowKind}.
- *
- * <p>Deserializes a <code>byte[]</code> message as a JSON object and reads the specified fields.
- *
- * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
- *
- * @see <a href="https://debezium.io/">Debezium</a>
- */
-@Internal
-public final class DebeziumJsonDeserializationSchema implements DeserializationSchema<RowData> {
-    private static final long serialVersionUID = 1L;
-
-    private static final String OP_READ = "r"; // snapshot read
-    private static final String OP_CREATE = "c"; // insert
-    private static final String OP_UPDATE = "u"; // update
-    private static final String OP_DELETE = "d"; // delete
-
-    private static final String REPLICA_IDENTITY_EXCEPTION =
-            "The \"before\" field of %s message is null, "
-                    + "if you are using Debezium Postgres Connector, "
-                    + "please check the Postgres table has been set REPLICA IDENTITY to FULL level.";
-
-    /** The deserializer to deserialize Debezium JSON data. */
-    private final JsonRowDataDeserializationSchema jsonDeserializer;
-
-    /** Flag that indicates that an additional projection is required for metadata. */
-    private final boolean hasMetadata;
-
-    /** Metadata to be extracted for every record. */
-    private final MetadataConverter[] metadataConverters;
-
-    private final List<ReadableMetadata> requestedMetadata;
-
-    /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */
-    private final TypeInformation<RowData> producedTypeInfo;
-
-    /**
-     * Flag indicating whether the Debezium JSON data contains schema part or not. When Debezium
-     * Kafka Connect enables "value.converter.schemas.enable", the JSON will contain "schema"
-     * information, but we just ignore "schema" and extract data from "payload".
-     */
-    private final boolean schemaInclude;
-
-    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
-    private final boolean ignoreParseErrors;
-
-    public DebeziumJsonDeserializationSchema(
-            DataType physicalDataType,
-            List<ReadableMetadata> requestedMetadata,
-            TypeInformation<RowData> producedTypeInfo,
-            boolean schemaInclude,
-            boolean ignoreParseErrors,
-            TimestampFormat timestampFormat) {
-        final RowType jsonRowType =
-                createJsonRowType(physicalDataType, requestedMetadata, schemaInclude);
-        this.jsonDeserializer =
-                new JsonRowDataDeserializationSchema(
-                        jsonRowType,
-                        // the result type is never used, so it's fine to pass in the produced type
-                        // info
-                        producedTypeInfo,
-                        false, // ignoreParseErrors already contains the functionality of
-                        // failOnMissingField
-                        ignoreParseErrors,
-                        timestampFormat);
-        this.hasMetadata = requestedMetadata.size() > 0;
-        this.metadataConverters =
-                createMetadataConverters(jsonRowType, requestedMetadata, schemaInclude);
-        this.requestedMetadata = requestedMetadata;
-        this.producedTypeInfo = producedTypeInfo;
-        this.schemaInclude = schemaInclude;
-        this.ignoreParseErrors = ignoreParseErrors;
-    }
-
-    @Override
-    public RowData deserialize(byte[] message) {
-        throw new RuntimeException(
-                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
-    }
-
-    @Override
-    public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
-        if (message == null || message.length == 0) {
-            // skip tombstone messages
-            return;
-        }
-        try {
-            GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
-            GenericRowData payload;
-            if (schemaInclude) {
-                payload = (GenericRowData) row.getField(0);
-            } else {
-                payload = row;
-            }
-
-            GenericRowData before = (GenericRowData) payload.getField(0);
-            GenericRowData after = (GenericRowData) payload.getField(1);
-            String op = payload.getField(2).toString();
-            if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
-                after.setRowKind(RowKind.INSERT);
-                emitRow(row, after, out);
-            } else if (OP_UPDATE.equals(op)) {
-                if (before == null) {
-                    throw new IllegalStateException(
-                            String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
-                }
-                before.setRowKind(RowKind.UPDATE_BEFORE);
-                after.setRowKind(RowKind.UPDATE_AFTER);
-                emitRow(row, before, out);
-                emitRow(row, after, out);
-            } else if (OP_DELETE.equals(op)) {
-                if (before == null) {
-                    throw new IllegalStateException(
-                            String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
-                }
-                before.setRowKind(RowKind.DELETE);
-                emitRow(row, before, out);
-            } else {
-                if (!ignoreParseErrors) {
-                    throw new IOException(
-                            format(
-                                    "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'",
-                                    op, new String(message)));
-                }
-            }
-        } catch (Throwable t) {
-            // a big try catch to protect the processing.
-            if (!ignoreParseErrors) {
-                throw new IOException(
-                        format("Corrupt Debezium JSON message '%s'.", new String(message)), t);
-            }
-        }
-    }
-
-    private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) {
-        final int physicalArity = physicalRow.getArity();
-        final int metadataArity = metadataConverters.length;
-
-        final GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + 1);
-
-        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
-            producedRow.setField(physicalPos + 1, physicalRow.getField(physicalPos));
-        }
-
-        // Put metadata in the first field of the emitted RowData
-        Map<StringData, StringData> metadataMap = new HashMap<>();
-        metadataMap.put(
-                StringData.fromString(MysqlBinLogData.MYSQL_METADATA_IS_DDL),
-                StringData.fromString("false"));
-
-        switch (physicalRow.getRowKind()) {
-            case INSERT:
-                // fall through
-            case UPDATE_AFTER:
-                metadataMap.put(
-                        StringData.fromString(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE),
-                        StringData.fromString(OP_CREATE));
-                break;
-            case UPDATE_BEFORE:
-                // fall through
-            case DELETE:
-                metadataMap.put(
-                        StringData.fromString(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE),
-                        StringData.fromString(OP_DELETE));
-                break;
-        }
-        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
-            metadataMap.put(
-                    StringData.fromString(getMysqlMetadataKey(requestedMetadata.get(metadataPos))),
-                    StringData.fromString(metadataConverters[metadataPos].convert(rootRow).toString())
-            );
-        }
-        producedRow.setField(0, new GenericMapData(metadataMap));
-
-        out.collect(producedRow);
-    }
-
-    @Override
-    public boolean isEndOfStream(RowData nextElement) {
-        return false;
-    }
-
-    @Override
-    public TypeInformation<RowData> getProducedType() {
-        return producedTypeInfo;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        DebeziumJsonDeserializationSchema that = (DebeziumJsonDeserializationSchema) o;
-        return Objects.equals(jsonDeserializer, that.jsonDeserializer)
-                && hasMetadata == that.hasMetadata
-                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
-                && schemaInclude == that.schemaInclude
-                && ignoreParseErrors == that.ignoreParseErrors;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                jsonDeserializer, hasMetadata, producedTypeInfo, schemaInclude, ignoreParseErrors);
-    }
-
-    // --------------------------------------------------------------------------------------------
-
-    private static RowType createJsonRowType(
-            DataType physicalDataType,
-            List<ReadableMetadata> readableMetadata,
-            boolean schemaInclude) {
-        DataType payload =
-                DataTypes.ROW(
-                        DataTypes.FIELD("before", physicalDataType),
-                        DataTypes.FIELD("after", physicalDataType),
-                        DataTypes.FIELD("op", DataTypes.STRING()));
-
-        // append fields that are required for reading metadata in the payload
-        final List<DataTypes.Field> payloadMetadataFields =
-                readableMetadata.stream()
-                        .filter(m -> m.isJsonPayload)
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        payload = DataTypeUtils.appendRowFields(payload, payloadMetadataFields);
-
-        DataType root = payload;
-        if (schemaInclude) {
-            // when Debezium Kafka connect enables "value.converter.schemas.enable",
-            // the JSON will contain "schema" information and we need to extract data from
-            // "payload".
-            root = DataTypes.ROW(DataTypes.FIELD("payload", payload));
-        }
-
-        // append fields that are required for reading metadata in the root
-        final List<DataTypes.Field> rootMetadataFields =
-                readableMetadata.stream()
-                        .filter(m -> !m.isJsonPayload)
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        root = DataTypeUtils.appendRowFields(root, rootMetadataFields);
-
-        return (RowType) root.getLogicalType();
-    }
-
-    private static MetadataConverter[] createMetadataConverters(
-            RowType jsonRowType, List<ReadableMetadata> requestedMetadata, boolean schemaInclude) {
-        return requestedMetadata.stream()
-                .map(
-                        m -> {
-                            if (m.isJsonPayload) {
-                                return convertInPayload(jsonRowType, m, schemaInclude);
-                            } else {
-                                return convertInRoot(jsonRowType, m);
-                            }
-                        })
-                .toArray(MetadataConverter[]::new);
-    }
-
-    private static MetadataConverter convertInRoot(RowType jsonRowType, ReadableMetadata metadata) {
-        final int pos = findFieldPos(metadata, jsonRowType);
-        return new MetadataConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(GenericRowData root, int unused) {
-                return metadata.converter.convert(root, pos);
-            }
-        };
-    }
-
-    private static MetadataConverter convertInPayload(
-            RowType jsonRowType, ReadableMetadata metadata, boolean schemaInclude) {
-        if (schemaInclude) {
-            final int pos = findFieldPos(metadata, (RowType) jsonRowType.getChildren().get(0));
-            return new MetadataConverter() {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                public Object convert(GenericRowData root, int unused) {
-                    final GenericRowData payload = (GenericRowData) root.getField(0);
-                    return metadata.converter.convert(payload, pos);
-                }
-            };
-        }
-        return convertInRoot(jsonRowType, metadata);
-    }
-
-    private static int findFieldPos(ReadableMetadata metadata, RowType jsonRowType) {
-        return jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
-    }
-
-    // --------------------------------------------------------------------------------------------
-
-    /**
-     * Converter that extracts a metadata field from the row (root or payload) that comes out of the
-     * JSON schema and converts it to the desired data type.
-     */
-    interface MetadataConverter extends Serializable {
-
-        // Method for top-level access.
-        default Object convert(GenericRowData row) {
-            return convert(row, -1);
-        }
-
-        Object convert(GenericRowData row, int pos);
-    }
-}
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java
deleted file mode 100644
index f95a3be..0000000
--- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumUtils.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.json.debezium;
-
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-
-public class DebeziumUtils {
-
-    public static String getMysqlMetadataKey(ReadableMetadata readableMetadata) {
-        switch (readableMetadata) {
-            case SOURCE_DATABASE:
-                return MysqlBinLogData.MYSQL_METADATA_DATABASE;
-            case SOURCE_TABLE:
-                return MysqlBinLogData.MYSQL_METADATA_TABLE;
-            case INGESTION_TIMESTAMP:
-                return MysqlBinLogData.MYSQL_METADATA_EVENT_TIME;
-            default:
-                throw new IllegalArgumentException("Not supported yet");
-        }
-    }
-}
diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml
index 4240bd8..2c2f88a 100644
--- a/inlong-sort/sort-formats/pom.xml
+++ b/inlong-sort/sort-formats/pom.xml
@@ -56,7 +56,6 @@
         <module>format-kv</module>
         <module>format-inlongmsg-base</module>
         <module>format-inlongmsg-csv</module>
-        <module>format-json</module>
     </modules>
 
     <properties>
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 5bcae07..373674b 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -60,12 +60,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-json</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-core</artifactId>
             <scope>provided</scope>
@@ -112,6 +106,11 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-avro</artifactId>
         </dependency>
 
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 75bdf71..a557b2d 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -24,6 +24,7 @@ import static org.apache.inlong.sort.singletenant.flink.pulsar.PulsarSourceBuild
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -38,9 +39,9 @@ import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.flink.hive.HiveCommitter;
 import org.apache.inlong.sort.flink.hive.HiveWriter;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
 import org.apache.inlong.sort.protocol.sink.IcebergSinkInfo;
@@ -126,19 +127,20 @@ public class Entrance {
     ) throws IOException, ClassNotFoundException {
         FieldInfo[] sourceFields = sourceInfo.getFields();
         DeserializationSchema<Row> schema = DeserializationSchemaFactory.build(
-                sourceFields, sourceInfo.getDeserializationInfo());
+                extractNonBuiltInFieldInfo(sourceFields), sourceInfo.getDeserializationInfo());
         FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(config, sourceFields);
-
-        DeserializationFunction function = new DeserializationFunction(
-                schema,
-                fieldMappingTransformer,
-                !(sourceInfo.getDeserializationInfo() instanceof DebeziumDeserializationInfo));
+        DeserializationFunction function = new DeserializationFunction(schema, fieldMappingTransformer);
         return sourceStream.process(function)
                 .uid(Constants.DESERIALIZATION_SCHEMA_UID)
                 .name("Deserialization")
                 .setParallelism(config.getInteger(Constants.DESERIALIZATION_PARALLELISM));
     }
 
+    private static FieldInfo[] extractNonBuiltInFieldInfo(FieldInfo[] fieldInfos) {
+        return Arrays.stream(fieldInfos).filter(fieldInfo -> !(fieldInfo instanceof BuiltInFieldInfo)).toArray(
+                FieldInfo[]::new);
+    }
+
     private static void buildSinkStream(
             DataStream<Row> sourceStream,
             Configuration config,
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
index 8f5d177..b2fc1bb 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationSchemaBuilder.java
@@ -18,35 +18,25 @@
 
 package org.apache.inlong.sort.singletenant.flink.deserialization;
 
-import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.MapFormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
-import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.types.utils.DataTypeUtils.validateInputDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToDataType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
 
 public class DebeziumDeserializationSchemaBuilder {
 
@@ -55,18 +45,10 @@ public class DebeziumDeserializationSchemaBuilder {
             DebeziumDeserializationInfo deserializationInfo
     ) throws IOException, ClassNotFoundException {
         TimestampFormat timestampFormat = getTimestampFormatStandard(deserializationInfo.getTimestampFormatStandard());
-        DebeziumJsonDecodingFormat debeziumJsonDecodingFormat = new DebeziumJsonDecodingFormat(
-                false, deserializationInfo.isIgnoreParseErrors(), timestampFormat);
-
-        // Extract required metadata
-        FieldInfo[] metadataFieldInfos = getMetadataFieldInfos(fieldInfos);
-        List<String> requiredMetadataKeys = Arrays.stream(metadataFieldInfos)
-                .map(FieldInfo::getName)
-                .collect(Collectors.toList());
-        debeziumJsonDecodingFormat.applyReadableMetadata(requiredMetadataKeys);
+        DebeziumJsonDecodingFormat debeziumJsonDecodingFormat =
+                new DebeziumJsonDecodingFormat(false, deserializationInfo.isIgnoreParseErrors(), timestampFormat);
 
-        FieldInfo[] originPhysicalFieldInfos = CommonUtils.extractNonBuiltInFieldInfos(fieldInfos);
-        FieldInfo[] convertedPhysicalFieldInfos = convertDateToStringFormatInfo(originPhysicalFieldInfos);
+        FieldInfo[] convertedInputFields = convertDateToStringFormatInfo(fieldInfos);
         DeserializationSchema<RowData> debeziumSchema = debeziumJsonDecodingFormat.createRuntimeDecoder(
                 new DynamicTableSource.Context() {
                     @Override
@@ -80,55 +62,12 @@ public class DebeziumDeserializationSchemaBuilder {
                         return null;
                     }
                 },
-                convertFieldInfosToDataType(convertedPhysicalFieldInfos)
+                convertFieldInfosToDataType(convertedInputFields)
         );
 
-        RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema = new RowDataToRowDeserializationSchemaWrapper(
-                debeziumSchema,
-                getProducedFieldInfos(convertedPhysicalFieldInfos));
-        return new CustomDateFormatDeserializationSchemaWrapper(
-                rowDataToRowSchema,
-                extractFormatInfos(getProducedFieldInfos(originPhysicalFieldInfos)));
+        RowDataToRowDeserializationSchemaWrapper rowDataToRowSchema =
+                new RowDataToRowDeserializationSchemaWrapper(debeziumSchema, convertedInputFields);
+        return new CustomDateFormatDeserializationSchemaWrapper(rowDataToRowSchema, extractFormatInfos(fieldInfos));
     }
 
-    public static FieldInfo[] getMetadataFieldInfos(FieldInfo[] fieldInfos) {
-        List<FieldInfo> metadataFieldInfos = new ArrayList<>();
-        Arrays.stream(fieldInfos)
-                .filter(fieldInfo -> fieldInfo instanceof BuiltInFieldInfo)
-                .forEach(fieldInfo -> {
-                    BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) fieldInfo;
-                    BuiltInField builtInField = builtInFieldInfo.getBuiltInField();
-                    switch (builtInField) {
-                        case MYSQL_METADATA_DATABASE:
-                            metadataFieldInfos.add(new FieldInfo(
-                                    ReadableMetadata.SOURCE_DATABASE.getKey(), StringFormatInfo.INSTANCE));
-                            break;
-                        case MYSQL_METADATA_TABLE:
-                            metadataFieldInfos.add(new FieldInfo(
-                                    ReadableMetadata.SOURCE_TABLE.getKey(), StringFormatInfo.INSTANCE));
-                            break;
-                        case MYSQL_METADATA_EVENT_TIME:
-                            metadataFieldInfos.add(new FieldInfo(
-                                    ReadableMetadata.INGESTION_TIMESTAMP.getKey(), LongFormatInfo.INSTANCE));
-                            break;
-                        case MYSQL_METADATA_IS_DDL:
-                        case MYSQL_METADATA_EVENT_TYPE:
-                            break;
-                        default:
-                            throw new IllegalArgumentException(
-                                    "Unsupported builtin field '" + builtInField + "' in debezium deserialization");
-                    }
-                });
-
-        return metadataFieldInfos.toArray(new FieldInfo[0]);
-    }
-
-    public static FieldInfo[] getProducedFieldInfos(FieldInfo[] physicalFieldInfos) {
-        List<FieldInfo> results = new ArrayList<>();
-        results.add(new FieldInfo(
-                "metadata",
-                new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE)));
-        results.addAll(Arrays.asList(physicalFieldInfos));
-        return results.toArray(new FieldInfo[0]);
-    }
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
index e9bec47..7c1414a 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.deserialization;
 
-import java.util.HashMap;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.types.Row;
@@ -35,23 +34,12 @@ public class DeserializationFunction extends ProcessFunction<SerializedRecord, R
 
     private final FieldMappingTransformer fieldMappingTransformer;
 
-    private final boolean appendAttributes;
-
     public DeserializationFunction(
             DeserializationSchema<Row> deserializationSchema,
-            FieldMappingTransformer fieldMappingTransformer,
-            boolean appendAttributes
+            FieldMappingTransformer fieldMappingTransformer
     ) {
         this.deserializationSchema = deserializationSchema;
         this.fieldMappingTransformer = fieldMappingTransformer;
-        this.appendAttributes = appendAttributes;
-    }
-
-    public DeserializationFunction(
-            DeserializationSchema<Row> deserializationSchema,
-            FieldMappingTransformer fieldMappingTransformer
-    ) {
-        this(deserializationSchema, fieldMappingTransformer, true);
     }
 
     @Override
@@ -76,9 +64,6 @@ public class DeserializationFunction extends ProcessFunction<SerializedRecord, R
                 }
 
                 deserializationSchema.deserialize(bodyBytes, new CallbackCollector<>(inputRow -> {
-                    if (appendAttributes) {
-                        inputRow = Row.join(Row.of(new HashMap<>()), inputRow);
-                    }
                     out.collect(fieldMappingTransformer.transform(inputRow, value.getTimestampMillis()));
                 }));
             }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
index 9a3cd74..a085a1b 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/DeserializationSchemaFactory.java
@@ -33,7 +33,6 @@ import org.apache.inlong.sort.protocol.deserialization.JsonDeserializationInfo;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
 
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.buildAvroRecordSchemaInJson;
 import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
@@ -46,19 +45,16 @@ public class DeserializationSchemaFactory {
             FieldInfo[] fieldInfos,
             DeserializationInfo deserializationInfo) throws IOException, ClassNotFoundException {
         if (deserializationInfo instanceof JsonDeserializationInfo) {
-            return buildJsonDeserializationSchema(CommonUtils.extractNonBuiltInFieldInfos(fieldInfos));
+            return buildJsonDeserializationSchema(fieldInfos);
         } else if (deserializationInfo instanceof AvroDeserializationInfo) {
-            return buildAvroDeserializationSchema(CommonUtils.extractNonBuiltInFieldInfos(fieldInfos));
+            return buildAvroDeserializationSchema(fieldInfos);
         } else if (deserializationInfo instanceof CanalDeserializationInfo) {
-            return CanalDeserializationSchemaBuilder.build(
-                    CommonUtils.extractNonBuiltInFieldInfos(fieldInfos),
-                    (CanalDeserializationInfo) deserializationInfo);
+            return CanalDeserializationSchemaBuilder.build(fieldInfos, (CanalDeserializationInfo) deserializationInfo);
         } else if (deserializationInfo instanceof DebeziumDeserializationInfo) {
-            return DebeziumDeserializationSchemaBuilder.build(
-                    fieldInfos,
+            return DebeziumDeserializationSchemaBuilder.build(fieldInfos,
                     (DebeziumDeserializationInfo) deserializationInfo);
         } else {
-            return buildStringDeserializationSchema(CommonUtils.extractNonBuiltInFieldInfos(fieldInfos));
+            return buildStringDeserializationSchema(fieldInfos);
         }
     }
 
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
index 8fe46f9..1d20a20 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformer.java
@@ -20,12 +20,9 @@ package org.apache.inlong.sort.singletenant.flink.deserialization;
 
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.FormatInfo;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
 import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.util.DefaultValueStrategy;
@@ -34,7 +31,6 @@ import java.io.Serializable;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Date;
-import java.util.Map;
 
 import static org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
 
@@ -45,7 +41,7 @@ public class FieldMappingTransformer implements Serializable {
     /**
      * Skips time and attribute fields of source record.
      */
-    public static final int SOURCE_FIELD_SKIP_STEP = 1;
+    public static final int SOURCE_FIELD_SKIP_STEP = 0;
 
     private final FieldInfo[] outputFieldInfos;
 
@@ -62,12 +58,11 @@ public class FieldMappingTransformer implements Serializable {
     public Row transform(Row sourceRow, long dt) {
         final Row outputRow = new Row(outputFieldInfos.length);
         int sourceRowIndex = SOURCE_FIELD_SKIP_STEP;
-        Map<String, String> attributes = (Map<String, String>) sourceRow.getField(0);
         for (int i = 0; i < outputFieldInfos.length; i++) {
             Object fieldValue = null;
             if (outputFieldInfos[i] instanceof BuiltInFieldInfo) {
                 BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) outputFieldInfos[i];
-                fieldValue = transformBuiltInField(builtInFieldInfo, attributes, dt);
+                fieldValue = transformBuiltInField(builtInFieldInfo, dt);
             } else if (sourceRowIndex < sourceRow.getArity()) {
                 fieldValue = sourceRow.getField(sourceRowIndex);
                 sourceRowIndex++;
@@ -81,25 +76,10 @@ public class FieldMappingTransformer implements Serializable {
         return outputRow;
     }
 
-    private static Object transformBuiltInField(
-            BuiltInFieldInfo builtInFieldInfo,
-            Map<String, String> attributes,
-            long dataTimestamp) {
-        switch (builtInFieldInfo.getBuiltInField()) {
-            case DATA_TIME:
-                return inferDataTimeValue(builtInFieldInfo.getFormatInfo(), dataTimestamp);
-            case MYSQL_METADATA_DATABASE:
-                return attributes.get(MysqlBinLogData.MYSQL_METADATA_DATABASE);
-            case MYSQL_METADATA_TABLE:
-                return attributes.get(MysqlBinLogData.MYSQL_METADATA_TABLE);
-            case MYSQL_METADATA_IS_DDL:
-                return BooleanFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_IS_DDL));
-            case MYSQL_METADATA_EVENT_TIME:
-                return LongFormatInfo.INSTANCE.deserialize(attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME));
-            case MYSQL_METADATA_EVENT_TYPE:
-                return attributes.get(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE);
+    private static Object transformBuiltInField(BuiltInFieldInfo builtInFieldInfo, long dataTimestamp) {
+        if (builtInFieldInfo.getBuiltInField() == BuiltInFieldInfo.BuiltInField.DATA_TIME) {
+            return inferDataTimeValue(builtInFieldInfo.getFormatInfo(), dataTimestamp);
         }
-
         return null;
     }
 
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
index ff9941a..e77c5b1 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationSchemaBuilder.java
@@ -17,28 +17,22 @@
 
 package org.apache.inlong.sort.singletenant.flink.serialization;
 
-import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.MAP_NULL_KEY_LITERAL_DEFAULT;
-import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.getMapNullKeyMode;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.createRowConverter;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
-import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.json.canal.CanalJsonSerializationSchema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
-import org.apache.inlong.sort.formats.json.canal.CanalJsonSerializationSchema;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
+
+import java.io.IOException;
+
+import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.MAP_NULL_KEY_LITERAL_DEFAULT;
+import static org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory.getMapNullKeyMode;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertDateToStringFormatInfo;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.convertFieldInfosToRowType;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.extractFormatInfos;
+import static org.apache.inlong.sort.singletenant.flink.utils.CommonUtils.getTimestampFormatStandard;
 
 public class CanalSerializationSchemaBuilder {
 
@@ -51,16 +45,10 @@ public class CanalSerializationSchemaBuilder {
             mapNullKeyLiteral = MAP_NULL_KEY_LITERAL_DEFAULT;
         }
 
-        FieldInfo[] originPhysicalFieldInfos = CommonUtils.extractNonBuiltInFieldInfos(fieldInfos);
-        FieldInfo[] convertedPhysicalFieldInfos = convertDateToStringFormatInfo(originPhysicalFieldInfos);
-        RowType convertedPhysicalRowType = convertFieldInfosToRowType(convertedPhysicalFieldInfos);
         FieldInfo[] convertedFieldInfos = convertDateToStringFormatInfo(fieldInfos);
-
+        RowType convertedRowType = convertFieldInfosToRowType(convertedFieldInfos);
         CanalJsonSerializationSchema canalSchema = new CanalJsonSerializationSchema(
-                convertedPhysicalRowType,
-                getFieldIndexToMetadata(fieldInfos),
-                createRowConverter(convertedFieldInfos),
-                createRowConverter(convertedPhysicalFieldInfos),
+                convertedRowType,
                 getTimestampFormatStandard(canalSerializationInfo.getTimestampFormatStandard()),
                 getMapNullKeyMode(canalSerializationInfo.getMapNullKeyMod()),
                 mapNullKeyLiteral,
@@ -73,38 +61,4 @@ public class CanalSerializationSchemaBuilder {
         return new CustomDateFormatSerializationSchemaWrapper(rowToRowDataSchema, extractFormatInfos(fieldInfos));
     }
 
-    private static Map<Integer, ReadableMetadata> getFieldIndexToMetadata(FieldInfo[] fieldInfos) {
-        Map<Integer, ReadableMetadata> fieldIndexToMetadata = new HashMap<>();
-
-        for (int i = 0; i < fieldInfos.length; i++) {
-            FieldInfo fieldInfo = fieldInfos[i];
-            if (fieldInfo instanceof BuiltInFieldInfo) {
-                BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) fieldInfo;
-                BuiltInField builtInField = builtInFieldInfo.getBuiltInField();
-                switch (builtInField) {
-                    case MYSQL_METADATA_DATABASE:
-                        fieldIndexToMetadata.put(i, ReadableMetadata.DATABASE);
-                        break;
-                    case MYSQL_METADATA_TABLE:
-                        fieldIndexToMetadata.put(i, ReadableMetadata.TABLE);
-                        break;
-                    case MYSQL_METADATA_EVENT_TIME:
-                        fieldIndexToMetadata.put(i, ReadableMetadata.EVENT_TIMESTAMP);
-                        break;
-                    case MYSQL_METADATA_IS_DDL:
-                        fieldIndexToMetadata.put(i, ReadableMetadata.IS_DDL);
-                        break;
-                    case MYSQL_METADATA_EVENT_TYPE:
-                        // We will always append `type` to the result
-                        break;
-                    default:
-                        throw new IllegalArgumentException(
-                                "Unsupported builtin field '" + builtInField + "' in debezium deserialization");
-                }
-            }
-        }
-
-        return fieldIndexToMetadata;
-    }
-
 }
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
index 92c0c13..a5636ed 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/utils/CommonUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.utils;
 
-import java.util.Arrays;
 import org.apache.avro.Schema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.common.TimestampFormat;
@@ -36,7 +35,6 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.formats.common.TypeInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 
 import java.io.ByteArrayInputStream;
@@ -195,10 +193,4 @@ public class CommonUtils {
         return output;
     }
 
-    public static FieldInfo[] extractNonBuiltInFieldInfos(FieldInfo[] fieldInfos) {
-        return Arrays.stream(fieldInfos)
-                .filter(fieldInfo -> !(fieldInfo instanceof BuiltInFieldInfo))
-                .toArray(FieldInfo[]::new);
-    }
-
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
deleted file mode 100644
index b1e994c..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/DebeziumToCanalITCase.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.types.Row;
-import org.apache.inlong.common.msg.InLongMsg;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
-import org.apache.inlong.sort.formats.common.IntFormatInfo;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
-import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationFunction;
-import org.apache.inlong.sort.singletenant.flink.deserialization.DeserializationSchemaFactory;
-import org.apache.inlong.sort.singletenant.flink.deserialization.FieldMappingTransformer;
-import org.apache.inlong.sort.singletenant.flink.serialization.SerializationSchemaFactory;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DebeziumToCanalITCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(DebeziumToCanalITCase.class);
-
-    private static final CountDownLatch verificationFinishedLatch = new CountDownLatch(1);
-
-    private static final CountDownLatch jobFinishedLatch = new CountDownLatch(1);
-
-    private final FieldInfo[] fieldInfos = new FieldInfo[]{
-            new FieldInfo("name", StringFormatInfo.INSTANCE),
-            new FieldInfo("age", IntFormatInfo.INSTANCE),
-            new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
-            new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
-            new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
-            new BuiltInFieldInfo("isDdl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
-            new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
-    };
-
-    private static final String expectedResult =
-            "{\"data\":[{\"name\":\"testName\",\"age\":29}],"
-                    + "\"type\":\"INSERT\",\"database\":\"test\",\"table\":\"test\","
-                    + "\"es\":1644896917208,\"isDdl\":false}";
-
-    @Test(timeout = 60 * 1000)
-    public void test() throws Exception {
-
-        final ExecutorService executorService = Executors.newSingleThreadExecutor();
-
-        executorService.execute(() -> {
-            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-            try {
-                DataStream<SerializedRecord> sourceStream = env.addSource(new TestSource());
-
-                // Deserialize
-                DeserializationSchema<Row> deserializationSchema = DeserializationSchemaFactory.build(
-                        fieldInfos,
-                        new DebeziumDeserializationInfo(false, "ISO_8601"));
-                FieldMappingTransformer fieldMappingTransformer = new FieldMappingTransformer(
-                        new Configuration(), fieldInfos);
-                DeserializationFunction function = new DeserializationFunction(
-                        deserializationSchema, fieldMappingTransformer, false);
-                DataStream<Row> deserializedStream = sourceStream.process(function);
-
-                // Serialize and output
-                SerializationSchema<Row> serializationSchema = SerializationSchemaFactory.build(
-                        fieldInfos,
-                        new CanalSerializationInfo("Sql", "Literal", null, false)
-                );
-                deserializedStream.addSink(new TestSink(serializationSchema));
-
-                env.execute();
-
-            } catch (Exception e) {
-                logger.error("Error occurred when executing flink test job: ", e);
-            } finally {
-                jobFinishedLatch.countDown();
-            }
-        });
-
-        try {
-            while (!verify()) {
-                Thread.sleep(500);
-            }
-            verificationFinishedLatch.countDown();
-            jobFinishedLatch.await();
-        } finally {
-            executorService.shutdown();
-        }
-
-        Thread.sleep(10000);
-    }
-
-    private boolean verify() {
-        if (TestSink.results.size() == 0) {
-            return false;
-        } else {
-            assertEquals(expectedResult, TestSink.results.get(0));
-            return true;
-        }
-    }
-
-    private static class TestSource extends RichSourceFunction<SerializedRecord> {
-
-        String testString = "{\n"
-                + "    \"before\":null,\n"
-                + "    \"after\":{\n"
-                + "        \"name\":\"testName\",\n"
-                + "        \"age\":29\n"
-                + "    },\n"
-                + "    \"source\":{\n"
-                + "        \"version\":\"1.4.2.Final\",\n"
-                + "        \"connector\":\"mysql\",\n"
-                + "        \"name\":\"my_server_01\",\n"
-                + "        \"ts_ms\":1644896917000,\n"
-                + "        \"snapshot\":\"false\",\n"
-                + "        \"db\":\"test\",\n"
-                + "        \"table\":\"test\",\n"
-                + "        \"server_id\":1,\n"
-                + "        \"gtid\":null,\n"
-                + "        \"file\":\"mysql-bin.000067\",\n"
-                + "        \"pos\":944,\n"
-                + "        \"row\":0,\n"
-                + "        \"thread\":13,\n"
-                + "        \"query\":null\n"
-                + "    },\n"
-                + "    \"op\":\"c\",\n"
-                + "    \"ts_ms\":1644896917208,\n"
-                + "    \"transaction\":null\n"
-                + "}";
-
-        @Override
-        public void open(org.apache.flink.configuration.Configuration configuration) {
-        }
-
-        @Override
-        public void run(SourceContext<SerializedRecord> sourceContext) throws Exception {
-            InLongMsg inLongMsg = InLongMsg.newInLongMsg(true);
-            String attrs = "m=0"
-                   + "&dt=" + System.currentTimeMillis()
-                   + "&iname=" + "tid";
-            inLongMsg.addMsg(attrs, testString.getBytes());
-            byte[] bytes = inLongMsg.buildArray();
-
-            sourceContext.collect(new SerializedRecord(System.currentTimeMillis(), bytes));
-
-            verificationFinishedLatch.await();
-        }
-
-        @Override
-        public void cancel() {
-
-        }
-    }
-
-    private static class TestSink extends RichSinkFunction<Row> {
-
-        private static final List<String> results = new ArrayList<>();
-
-        private final SerializationSchema<Row> serializationSchema;
-
-        public TestSink(SerializationSchema<Row> serializationSchema) {
-            this.serializationSchema = serializationSchema;
-        }
-
-        @Override
-        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-            serializationSchema.open(null);
-        }
-
-        @Override
-        public void invoke(Row record, Context context) {
-            results.add(new String(serializationSchema.serialize(record)));
-        }
-    }
-}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java
index 1e90ccb..04de89b 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/DebeziumDeserializationTest.java
@@ -28,9 +28,6 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimeFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
-import org.apache.inlong.sort.formats.json.MysqlBinLogData;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DebeziumDeserializationInfo;
 import org.junit.Test;
@@ -57,42 +54,30 @@ public class DebeziumDeserializationTest {
             new FieldInfo("map",
                     new MapFormatInfo(StringFormatInfo.INSTANCE, LongFormatInfo.INSTANCE)),
             new FieldInfo("map2map", new MapFormatInfo(StringFormatInfo.INSTANCE,
-                    new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE))),
-            new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
-            new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
-            new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME)
+                    new MapFormatInfo(StringFormatInfo.INSTANCE, IntFormatInfo.INSTANCE)))
     };
 
     private Row generateTestRow() {
-        Row testRow = new Row(9);
-        testRow.setField(0, new HashMap<String, String>() {
-            {
-                put(MysqlBinLogData.MYSQL_METADATA_IS_DDL, "false");
-                put(MysqlBinLogData.MYSQL_METADATA_EVENT_TYPE, "c");
-                put(MysqlBinLogData.MYSQL_METADATA_DATABASE, "test");
-                put(MysqlBinLogData.MYSQL_METADATA_TABLE, "test");
-                put(MysqlBinLogData.MYSQL_METADATA_EVENT_TIME, "1644896917208");
-            }
-        });
-        testRow.setField(1, 1238123899121L);
-        testRow.setField(2, "testName");
+        Row testRow = new Row(8);
+        testRow.setField(0, 1238123899121L);
+        testRow.setField(1, "testName");
 
         byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6};
-        testRow.setField(3, bytes);
+        testRow.setField(2, bytes);
 
-        testRow.setField(4, Date.valueOf("1990-10-14"));
-        testRow.setField(5, Time.valueOf("12:12:43"));
-        testRow.setField(6, Timestamp.valueOf("1990-10-14 12:12:43"));
+        testRow.setField(3, Date.valueOf("1990-10-14"));
+        testRow.setField(4, Time.valueOf("12:12:43"));
+        testRow.setField(5, Timestamp.valueOf("1990-10-14 12:12:43"));
 
         Map<String, Long> map = new HashMap<>();
         map.put("flink", 123L);
-        testRow.setField(7, map);
+        testRow.setField(6, map);
 
         Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
         Map<String, Integer> innerMap = new HashMap<>();
         innerMap.put("key", 234);
         nestedMap.put("inner_map", innerMap);
-        testRow.setField(8, nestedMap);
+        testRow.setField(7, nestedMap);
 
         return testRow;
     }
@@ -100,48 +85,48 @@ public class DebeziumDeserializationTest {
     @Test
     public void testDebeziumDeserializationSchema() throws IOException, ClassNotFoundException {
         String testString = "{\n"
-                                    + "    \"before\":null,\n"
-                                    + "    \"after\":{\n"
-                                    + "        \"id\":1238123899121,\n"
-                                    + "        \"name\":\"testName\",\n"
-                                    + "        \"bytes\":\"AQIDBAUG\",\n"
-                                    + "        \"date\":\"1990-10-14\",\n"
-                                    + "        \"time\":\"12:12:43\",\n"
-                                    + "        \"timestamp\":\"1990-10-14 12:12:43\",\n"
-                                    + "        \"map\":{\n"
-                                    + "            \"flink\":123\n"
-                                    + "        },\n"
-                                    + "        \"map2map\":{\n"
-                                    + "            \"inner_map\":{\n"
-                                    + "                \"key\":234\n"
-                                    + "            }\n"
-                                    + "        }\n"
-                                    + "    },\n"
-                                    + "    \"source\":{\n"
-                                    + "        \"version\":\"1.4.2.Final\",\n"
-                                    + "        \"connector\":\"mysql\",\n"
-                                    + "        \"name\":\"my_server_01\",\n"
-                                    + "        \"ts_ms\":1644896917000,\n"
-                                    + "        \"snapshot\":\"false\",\n"
-                                    + "        \"db\":\"test\",\n"
-                                    + "        \"table\":\"test\",\n"
-                                    + "        \"server_id\":1,\n"
-                                    + "        \"gtid\":null,\n"
-                                    + "        \"file\":\"mysql-bin.000067\",\n"
-                                    + "        \"pos\":944,\n"
-                                    + "        \"row\":0,\n"
-                                    + "        \"thread\":13,\n"
-                                    + "        \"query\":null\n"
-                                    + "    },\n"
-                                    + "    \"op\":\"c\",\n"
-                                    + "    \"ts_ms\":1644896917208,\n"
-                                    + "    \"transaction\":null\n"
-                                    + "}";
+                + "    \"before\":null,\n"
+                + "    \"after\":{\n"
+                + "        \"id\":1238123899121,\n"
+                + "        \"name\":\"testName\",\n"
+                + "        \"bytes\":\"AQIDBAUG\",\n"
+                + "        \"date\":\"1990-10-14\",\n"
+                + "        \"time\":\"12:12:43\",\n"
+                + "        \"timestamp\":\"1990-10-14 12:12:43\",\n"
+                + "        \"map\":{\n"
+                + "            \"flink\":123\n"
+                + "        },\n"
+                + "        \"map2map\":{\n"
+                + "            \"inner_map\":{\n"
+                + "                \"key\":234\n"
+                + "            }\n"
+                + "        }\n"
+                + "    },\n"
+                + "    \"source\":{\n"
+                + "        \"version\":\"1.4.2.Final\",\n"
+                + "        \"connector\":\"mysql\",\n"
+                + "        \"name\":\"my_server_01\",\n"
+                + "        \"ts_ms\":1644896917000,\n"
+                + "        \"snapshot\":\"false\",\n"
+                + "        \"db\":\"test\",\n"
+                + "        \"table\":\"test\",\n"
+                + "        \"server_id\":1,\n"
+                + "        \"gtid\":null,\n"
+                + "        \"file\":\"mysql-bin.000067\",\n"
+                + "        \"pos\":944,\n"
+                + "        \"row\":0,\n"
+                + "        \"thread\":13,\n"
+                + "        \"query\":null\n"
+                + "    },\n"
+                + "    \"op\":\"c\",\n"
+                + "    \"ts_ms\":1644896917208,\n"
+                + "    \"transaction\":null\n"
+                + "}";
 
         byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);
         DeserializationSchema<Row> schemaWithoutFilter = DeserializationSchemaFactory.build(
                 fieldInfos,
-                new DebeziumDeserializationInfo(false, "ISO_8601")
+                new DebeziumDeserializationInfo(true, "ISO_8601")
         );
         ListCollector<Row> collector = new ListCollector<>();
         schemaWithoutFilter.deserialize(testBytes, collector);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java
index ec8ac03..d51421d 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/deserialization/FieldMappingTransformerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.singletenant.flink.deserialization;
 
-import java.util.HashMap;
 import org.apache.flink.types.Row;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
@@ -54,7 +53,7 @@ public class FieldMappingTransformerTest {
         configuration.setBoolean(SINK_FIELD_TYPE_SHORT_NULLABLE, false);
         configuration.setBoolean(SINK_FIELD_TYPE_LONG_NULLABLE, false);
         FieldMappingTransformer transformer = new FieldMappingTransformer(configuration, fieldInfos);
-        Row resultRow = transformer.transform(Row.of(new HashMap<>(), 1), ms);
+        Row resultRow = transformer.transform(Row.of(1), ms);
 
         assertEquals(6, resultRow.getArity());
         assertEquals(new Timestamp(ms), resultRow.getField(0));
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationTest.java
deleted file mode 100644
index 82ba75d..0000000
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/serialization/CanalSerializationTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.singletenant.flink.serialization;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.UserCodeClassLoader;
-import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
-import org.apache.inlong.sort.formats.common.IntFormatInfo;
-import org.apache.inlong.sort.formats.common.LongFormatInfo;
-import org.apache.inlong.sort.formats.common.StringFormatInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
-import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.serialization.CanalSerializationInfo;
-import org.junit.Test;
-
-public class CanalSerializationTest {
-
-    private final FieldInfo[] fieldInfos = new FieldInfo[]{
-            new FieldInfo("name", StringFormatInfo.INSTANCE),
-            new FieldInfo("age", IntFormatInfo.INSTANCE),
-            new BuiltInFieldInfo("db", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_DATABASE),
-            new BuiltInFieldInfo("table", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_TABLE),
-            new BuiltInFieldInfo("es", LongFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TIME),
-            new BuiltInFieldInfo("isDdl", BooleanFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_IS_DDL),
-            new BuiltInFieldInfo("type", StringFormatInfo.INSTANCE, BuiltInField.MYSQL_METADATA_EVENT_TYPE)
-    };
-
-    @Test
-    public void test() throws Exception {
-        SerializationSchema<Row> canalJsonSerializationSchema = SerializationSchemaFactory.build(
-                fieldInfos,
-                new CanalSerializationInfo("Sql", "Literal", null, false)
-        );
-        canalJsonSerializationSchema.open(new InitializationContext() {
-            @Override
-            public MetricGroup getMetricGroup() {
-                return null;
-            }
-
-            @Override
-            public UserCodeClassLoader getUserCodeClassLoader() {
-                return null;
-            }
-        });
-
-        Row row = Row.of(
-                "name",
-                29,
-                "database",
-                "table",
-                123L,
-                false,
-                "INSERT");
-        String result = new String(canalJsonSerializationSchema.serialize(row));
-
-        String expectedResult =
-                "{\"data\":[{\"name\":\"name\",\"age\":29}],"
-                        + "\"type\":\"INSERT\",\"database\":\"database\","
-                        + "\"table\":\"table\",\"es\":123,\"isDdl\":false}";
-        assertEquals(expectedResult, result);
-    }
-}