You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/11 13:23:39 UTC

[incubator-paimon] branch master updated: [flink] Kafka canal CDC should recognize schema change when ddl record is missing (#1526)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5d8d143 [flink] Kafka canal CDC should recognize schema change when ddl record is missing (#1526)
ac5d8d143 is described below

commit ac5d8d1438a447a07a64f8f4644c5cd504e421cd
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Jul 11 21:23:35 2023 +0800

    [flink] Kafka canal CDC should recognize schema change when ddl record is missing (#1526)
---
 .../java/org/apache/paimon/utils/StringUtils.java  |   4 +
 .../flink/action/cdc/kafka/KafkaActionUtils.java   |   2 +-
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  |  20 +-
 .../action/cdc/kafka/KafkaSyncTableAction.java     |  19 +-
 .../cdc/kafka/canal/CanalJsonEventParser.java      | 332 -------------------
 .../action/cdc/kafka/canal/CanalRecordParser.java  | 352 +++++++++++++++++++++
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |   6 -
 .../apache/paimon/flink/sink/cdc/CdcRecord.java    |   5 +
 .../apache/paimon/flink/sink/cdc/EventParser.java  |   4 -
 .../flink/sink/cdc/RichCdcMultiplexRecord.java     |  89 ++++++
 .../cdc/RichCdcMultiplexRecordEventParser.java}    |  38 +--
 .../flink/sink/cdc/RichCdcParserFactory.java       |  70 ----
 .../paimon/flink/sink/cdc/RichCdcRecord.java       |  45 +--
 .../paimon/flink/sink/cdc/RichCdcSinkBuilder.java  |   2 +-
 .../paimon/flink/sink/cdc/RichEventParser.java     |  65 ++++
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  29 +-
 .../action/cdc/kafka/KafkaEventParserTest.java     | 279 ----------------
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |   5 -
 .../schemaevolutionmissingddl/canal-data-1.txt     |  20 ++
 .../schemaevolutionmissingddl/canal-data-2.txt     |  21 ++
 .../schemaevolutionmissingddl/canal-data-3.txt     |  22 ++
 .../schemaevolutionmissingddl/canal-data-4.txt     |  20 ++
 .../schemaevolutionmissingddl/canal-data-5.txt     |  20 ++
 23 files changed, 702 insertions(+), 767 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 745a1c75a..d319ad642 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -535,4 +535,8 @@ public class StringUtils {
         }
         return true;
     }
+
+    public static String caseSensitiveConversion(String str, boolean caseSensitive) {
+        return caseSensitive ? str : str.toLowerCase();
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 43bf5d8c7..148e3ced2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -158,7 +158,7 @@ class KafkaActionUtils {
     }
 
     static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
-        KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
+        KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
         String groupId = kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID);
         kafkaSourceBuilder
                 .setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 3f4a433fa..07b20c523 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -24,9 +24,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser;
+import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
@@ -212,18 +214,24 @@ public class KafkaSyncDatabaseAction extends ActionBase {
         kafkaConfig.set(KafkaConnectorOptions.TOPIC, monitoredTopics);
         KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig);
 
-        EventParser.Factory<String> parserFactory;
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
         String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
         if ("canal-json".equals(format)) {
-            parserFactory = () -> new CanalJsonEventParser(caseSensitive, tableNameConverter);
+            parserFactory = RichCdcMultiplexRecordEventParser::new;
         } else {
             throw new UnsupportedOperationException("This format: " + format + " is not support.");
         }
-        FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
-                new FlinkCdcSyncDatabaseSinkBuilder<String>()
+
+        FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
+                new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
                         .withInput(
                                 env.fromSource(
-                                        source, WatermarkStrategy.noWatermarks(), "Kafka Source"))
+                                                source,
+                                                WatermarkStrategy.noWatermarks(),
+                                                "Kafka Source")
+                                        .flatMap(
+                                                new CanalRecordParser(
+                                                        caseSensitive, tableNameConverter)))
                         .withParserFactory(parserFactory)
                         .withTables(fileStoreTables)
                         .withCatalogLoader(catalogLoader())
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 936e03fab..d890eeaa7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -24,9 +24,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser;
+import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
 
@@ -166,18 +168,23 @@ public class KafkaSyncTableAction extends ActionBase {
             table = (FileStoreTable) catalog.getTable(identifier);
         }
         String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
-        EventParser.Factory<String> parserFactory;
+        EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
         if ("canal-json".equals(format)) {
-            parserFactory = () -> new CanalJsonEventParser(caseSensitive, computedColumns);
+            parserFactory = RichCdcMultiplexRecordEventParser::new;
         } else {
             throw new UnsupportedOperationException("This format: " + format + " is not support.");
         }
 
-        CdcSinkBuilder<String> sinkBuilder =
-                new CdcSinkBuilder<String>()
+        CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
+                new CdcSinkBuilder<RichCdcMultiplexRecord>()
                         .withInput(
                                 env.fromSource(
-                                        source, WatermarkStrategy.noWatermarks(), "Kafka Source"))
+                                                source,
+                                                WatermarkStrategy.noWatermarks(),
+                                                "Kafka Source")
+                                        .flatMap(
+                                                new CanalRecordParser(
+                                                        caseSensitive, computedColumns)))
                         .withParserFactory(parserFactory)
                         .withTable(table)
                         .withIdentifier(identifier)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java
deleted file mode 100644
index 7d68597b9..000000000
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.kafka.canal;
-
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.StringUtils;
-
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
-
-import com.alibaba.druid.DbType;
-import com.alibaba.druid.sql.SQLUtils;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableDropColumnItem;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableItem;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableStatement;
-import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
-import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableChangeColumn;
-import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyColumn;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** {@link EventParser} for Canal-json. */
-public class CanalJsonEventParser implements EventParser<String> {
-
-    private static final String FIELD_DATA = "data";
-    private static final String FIELD_OLD = "old";
-    private static final String TYPE = "type";
-    private static final String MYSQL_TYPE = "mysqlType";
-    private static final String OP_INSERT = "INSERT";
-    private static final String OP_UPDATE = "UPDATE";
-    private static final String OP_DELETE = "DELETE";
-
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    private JsonNode root;
-
-    private final boolean caseSensitive;
-    private final TableNameConverter tableNameConverter;
-    private final List<ComputedColumn> computedColumns;
-
-    public CanalJsonEventParser(boolean caseSensitive, List<ComputedColumn> computedColumns) {
-        this(caseSensitive, new TableNameConverter(caseSensitive), computedColumns);
-    }
-
-    public CanalJsonEventParser(boolean caseSensitive, TableNameConverter tableNameConverter) {
-        this(caseSensitive, tableNameConverter, Collections.emptyList());
-    }
-
-    public CanalJsonEventParser(
-            boolean caseSensitive,
-            TableNameConverter tableNameConverter,
-            List<ComputedColumn> computedColumns) {
-        this.caseSensitive = caseSensitive;
-        this.tableNameConverter = tableNameConverter;
-        this.computedColumns = computedColumns;
-    }
-
-    @Override
-    public void setRawEvent(String rawEvent) {
-        try {
-            root = objectMapper.readValue(rawEvent, JsonNode.class);
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public String parseTableName() {
-        String tableName = root.get("table").asText();
-        return tableNameConverter.convert(tableName);
-    }
-
-    private boolean isSchemaChange() {
-        if (root.get("isDdl") == null) {
-            return false;
-        } else {
-            return "true".equals(root.get("isDdl").asText());
-        }
-    }
-
-    @Override
-    public List<DataField> parseSchemaChange() {
-        if (!isSchemaChange()) {
-            return Collections.emptyList();
-        }
-
-        String sql = root.get("sql").asText();
-
-        if (StringUtils.isEmpty(sql)) {
-            return Collections.emptyList();
-        }
-        List<DataField> result = new ArrayList<>();
-        int id = 0;
-        SQLStatement sqlStatement = SQLUtils.parseSingleStatement(sql, DbType.mysql);
-        if (sqlStatement instanceof SQLAlterTableStatement) {
-            SQLAlterTableStatement sqlAlterTableStatement = (SQLAlterTableStatement) sqlStatement;
-            for (int i = 0; i < sqlAlterTableStatement.getItems().size(); i++) {
-                SQLAlterTableItem sqlAlterTableItem = sqlAlterTableStatement.getItems().get(i);
-                if (sqlAlterTableItem instanceof SQLAlterTableAddColumn) {
-                    SQLAlterTableAddColumn sqlAlterTableAddColumn =
-                            (SQLAlterTableAddColumn) sqlAlterTableItem;
-                    List<SQLColumnDefinition> columns = sqlAlterTableAddColumn.getColumns();
-                    for (SQLColumnDefinition column : columns) {
-                        String columnName = column.getColumnName().replace("`", "");
-                        String columnType = column.getDataType().toString();
-                        DataType dataType = MySqlTypeUtils.toDataType(columnType);
-                        boolean notNull = column.toString().toUpperCase().contains("NOT NULL");
-                        dataType = notNull ? dataType.notNull() : dataType.nullable();
-                        result =
-                                result.stream()
-                                        .filter(dataField -> !dataField.name().equals(columnName))
-                                        .collect(Collectors.toList());
-                        result.add(
-                                new DataField(
-                                        id++,
-                                        caseSensitive ? columnName : columnName.toLowerCase(),
-                                        dataType));
-                    }
-                } else if (sqlAlterTableItem instanceof SQLAlterTableDropColumnItem) {
-                    // ignore
-                } else if (sqlAlterTableItem instanceof MySqlAlterTableModifyColumn) {
-                    MySqlAlterTableModifyColumn mySqlAlterTableModifyColumn =
-                            (MySqlAlterTableModifyColumn) sqlAlterTableItem;
-                    SQLColumnDefinition newColumnDefinition =
-                            mySqlAlterTableModifyColumn.getNewColumnDefinition();
-                    String columnName = newColumnDefinition.getColumnName().replace("`", "");
-                    String columnType = newColumnDefinition.getDataType().toString();
-                    DataType dataType = MySqlTypeUtils.toDataType(columnType);
-                    boolean notNull =
-                            newColumnDefinition.toString().toUpperCase().contains("NOT NULL");
-                    dataType = notNull ? dataType.notNull() : dataType.nullable();
-                    result.add(
-                            new DataField(
-                                    id++,
-                                    caseSensitive ? columnName : columnName.toLowerCase(),
-                                    dataType));
-
-                } else if (sqlAlterTableItem instanceof MySqlAlterTableChangeColumn) {
-                    MySqlAlterTableChangeColumn mySqlAlterTableChangeColumn =
-                            (MySqlAlterTableChangeColumn) sqlAlterTableItem;
-                    SQLColumnDefinition newColumnDefinition =
-                            mySqlAlterTableChangeColumn.getNewColumnDefinition();
-                    String oldColumnName =
-                            mySqlAlterTableChangeColumn
-                                    .getColumnName()
-                                    .getSimpleName()
-                                    .replace("`", "");
-                    String columnName = newColumnDefinition.getColumnName().replace("`", "");
-                    String columnType = newColumnDefinition.getDataType().toString();
-                    DataType dataType = MySqlTypeUtils.toDataType(columnType);
-                    boolean notNull =
-                            newColumnDefinition.toString().toUpperCase().contains("NOT NULL");
-                    dataType = notNull ? dataType.notNull() : dataType.nullable();
-                    result =
-                            result.stream()
-                                    .filter(dataField -> !dataField.name().equals(oldColumnName))
-                                    .collect(Collectors.toList());
-                    result.add(
-                            new DataField(
-                                    id++,
-                                    caseSensitive ? columnName : columnName.toLowerCase(),
-                                    dataType));
-                }
-            }
-        }
-
-        return result;
-    }
-
-    @Override
-    public List<CdcRecord> parseRecords() {
-        if (isSchemaChange()) {
-            return Collections.emptyList();
-        }
-        Preconditions.checkNotNull(
-                root.get(TYPE),
-                "CanalJsonEventParser only supports canal-json format,"
-                        + "please make sure that your topic's format is accurate.");
-        List<CdcRecord> records = new ArrayList<>();
-        String type = root.get(TYPE).asText();
-        if (OP_UPDATE.equals(type)) {
-            ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
-            ArrayNode old =
-                    root.get(FIELD_OLD) instanceof NullNode
-                            ? null
-                            : (ArrayNode) root.get(FIELD_OLD);
-            for (int i = 0; i < data.size(); i++) {
-                Map<String, String> after = extractRow(data.get(i));
-                if (old != null) {
-                    Map<String, String> before = extractRow(old.get(i));
-                    // fields in "old" (before) means the fields are changed
-                    // fields not in "old" (before) means the fields are not changed
-                    // so we just copy the not changed fields into before
-                    for (Map.Entry<String, String> entry : after.entrySet()) {
-                        if (!before.containsKey(entry.getKey())) {
-                            before.put(entry.getKey(), entry.getValue());
-                        }
-                    }
-                    before = caseSensitive ? before : keyCaseInsensitive(before);
-                    records.add(new CdcRecord(RowKind.DELETE, before));
-                }
-                after = caseSensitive ? after : keyCaseInsensitive(after);
-                records.add(new CdcRecord(RowKind.INSERT, after));
-            }
-        } else if (OP_INSERT.equals(type)) {
-            ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
-            for (int i = 0; i < data.size(); i++) {
-                Map<String, String> after = extractRow(data.get(i));
-                after = caseSensitive ? after : keyCaseInsensitive(after);
-                records.add(new CdcRecord(RowKind.INSERT, after));
-            }
-        } else if (OP_DELETE.equals(type)) {
-            ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
-            for (int i = 0; i < data.size(); i++) {
-                Map<String, String> after = extractRow(data.get(i));
-                after = caseSensitive ? after : keyCaseInsensitive(after);
-                records.add(new CdcRecord(RowKind.DELETE, after));
-            }
-        }
-
-        return records;
-    }
-
-    private Map<String, String> extractRow(JsonNode recordRow) {
-        Map<String, String> mySqlFieldTypes = new HashMap<>();
-        Preconditions.checkNotNull(
-                root.get(MYSQL_TYPE),
-                "CanalJsonEventParser only supports canal-json format,"
-                        + "please make sure that your topic's format is accurate.");
-        JsonNode schema = root.get(MYSQL_TYPE);
-        Iterator<String> iterator = schema.fieldNames();
-        while (iterator.hasNext()) {
-            String fieldName = iterator.next();
-            String fieldType = schema.get(fieldName).asText();
-            mySqlFieldTypes.put(fieldName, fieldType);
-        }
-
-        Map<String, Object> jsonMap =
-                objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
-        if (jsonMap == null) {
-            return new HashMap<>();
-        }
-        Map<String, String> resultMap = new HashMap<>();
-        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
-            String fieldName = field.getKey();
-            String mySqlType = field.getValue();
-            Object objectValue = jsonMap.get(fieldName);
-            if (objectValue == null) {
-                continue;
-            }
-
-            String oldValue = objectValue.toString();
-            String newValue = oldValue;
-            if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
-                newValue = CanalFieldParser.convertSet(newValue, mySqlType);
-            } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
-                newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
-            } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
-                try {
-                    byte[] wkb =
-                            CanalFieldParser.convertGeoType2WkbArray(
-                                    oldValue.getBytes(StandardCharsets.ISO_8859_1));
-                    newValue = MySqlTypeUtils.convertWkbArray(wkb);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            String.format("Failed to convert %s to geometry JSON.", oldValue), e);
-                }
-            }
-            resultMap.put(fieldName, newValue);
-        }
-        // generate values of computed columns
-        for (ComputedColumn computedColumn : computedColumns) {
-            resultMap.put(
-                    computedColumn.columnName(),
-                    computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
-        }
-
-        return resultMap;
-    }
-
-    private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
-        Map<String, String> keyCaseInsensitive = new HashMap<>();
-        for (Map.Entry<String, String> entry : origin.entrySet()) {
-            String fieldName = entry.getKey().toLowerCase();
-            checkArgument(
-                    !keyCaseInsensitive.containsKey(fieldName),
-                    "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s",
-                    origin);
-            keyCaseInsensitive.put(fieldName, entry.getValue());
-        }
-        return keyCaseInsensitive;
-    }
-}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
new file mode 100644
index 000000000..6812aaa8a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka.canal;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
+
+import com.alibaba.druid.DbType;
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableDropColumnItem;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableItem;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableStatement;
+import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableChangeColumn;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyColumn;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultiplexRecord> {
+
+    private static final String FIELD_DATABASE = "database";
+    private static final String FIELD_TABLE = "table";
+    private static final String FIELD_SQL = "sql";
+    private static final String FIELD_MYSQL_TYPE = "mysqlType";
+    private static final String FIELD_TYPE = "type";
+    private static final String FIELD_DATA = "data";
+    private static final String FIELD_OLD = "old";
+    private static final String OP_UPDATE = "UPDATE";
+    private static final String OP_INSERT = "INSERT";
+    private static final String OP_DELETE = "DELETE";
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final boolean caseSensitive;
+    private final TableNameConverter tableNameConverter;
+    private final List<ComputedColumn> computedColumns;
+
+    private JsonNode root;
+    private String databaseName;
+    private String tableName;
+
+    public CanalRecordParser(boolean caseSensitive, List<ComputedColumn> computedColumns) {
+        this(caseSensitive, new TableNameConverter(caseSensitive), computedColumns);
+    }
+
+    public CanalRecordParser(boolean caseSensitive, TableNameConverter tableNameConverter) {
+        this(caseSensitive, tableNameConverter, Collections.emptyList());
+    }
+
+    public CanalRecordParser(
+            boolean caseSensitive,
+            TableNameConverter tableNameConverter,
+            List<ComputedColumn> computedColumns) {
+        this.caseSensitive = caseSensitive;
+        this.tableNameConverter = tableNameConverter;
+        this.computedColumns = computedColumns;
+    }
+
+    @Override
+    public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) throws Exception {
+        root = objectMapper.readValue(value, JsonNode.class);
+        validateFormat();
+
+        databaseName = root.get(FIELD_DATABASE).asText();
+        tableName = tableNameConverter.convert(root.get(FIELD_TABLE).asText());
+
+        extractRecords().forEach(out::collect);
+    }
+
+    private void validateFormat() {
+        String errorMessageTemplate =
+                "Didn't find '%s' node in json. Only supports canal-json format,"
+                        + "please make sure your topic's format is correct.";
+
+        checkNotNull(root.get(FIELD_DATABASE), errorMessageTemplate, FIELD_DATABASE);
+        checkNotNull(root.get(FIELD_TABLE), errorMessageTemplate, FIELD_TABLE);
+        checkNotNull(root.get(FIELD_TYPE), errorMessageTemplate, FIELD_TYPE);
+        checkNotNull(root.get(FIELD_DATA), errorMessageTemplate, FIELD_DATA);
+
+        if (isDdl()) {
+            checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
+        } else {
+            checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
+        }
+    }
+
+    private boolean isDdl() {
+        return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+    }
+
+    private List<RichCdcMultiplexRecord> extractRecords() {
+        if (isDdl()) {
+            return extractRecordsFromDdl();
+        }
+
+        // extract field types
+        LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
+        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
+        mySqlFieldTypes.forEach(
+                (name, type) -> paimonFieldTypes.put(name, toPaimonDataType(type, true)));
+
+        // extract row kind and field values
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        String type = root.get(FIELD_TYPE).asText();
+        ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
+        switch (type) {
+            case OP_UPDATE:
+                ArrayNode old =
+                        root.get(FIELD_OLD) instanceof NullNode
+                                ? null
+                                : (ArrayNode) root.get(FIELD_OLD);
+                for (int i = 0; i < data.size(); i++) {
+                    Map<String, String> after = extractRow(data.get(i), mySqlFieldTypes);
+                    if (old != null) {
+                        Map<String, String> before = extractRow(old.get(i), mySqlFieldTypes);
+                        // fields in "old" (before) means the fields are changed
+                        // fields not in "old" (before) means the fields are not changed,
+                        // so we just copy the not changed fields into before
+                        for (Map.Entry<String, String> entry : after.entrySet()) {
+                            if (!before.containsKey(entry.getKey())) {
+                                before.put(entry.getKey(), entry.getValue());
+                            }
+                        }
+                        before = caseSensitive ? before : keyCaseInsensitive(before);
+                        records.add(
+                                new RichCdcMultiplexRecord(
+                                        new CdcRecord(RowKind.DELETE, before),
+                                        paimonFieldTypes,
+                                        databaseName,
+                                        tableName));
+                    }
+                    after = caseSensitive ? after : keyCaseInsensitive(after);
+                    records.add(
+                            new RichCdcMultiplexRecord(
+                                    new CdcRecord(RowKind.INSERT, after),
+                                    paimonFieldTypes,
+                                    databaseName,
+                                    tableName));
+                }
+                break;
+            case OP_INSERT:
+                // fall through
+            case OP_DELETE:
+                for (JsonNode datum : data) {
+                    Map<String, String> after = extractRow(datum, mySqlFieldTypes);
+                    after = caseSensitive ? after : keyCaseInsensitive(after);
+                    RowKind kind = type.equals(OP_INSERT) ? RowKind.INSERT : RowKind.DELETE;
+                    records.add(
+                            new RichCdcMultiplexRecord(
+                                    new CdcRecord(kind, after),
+                                    paimonFieldTypes,
+                                    databaseName,
+                                    tableName));
+                }
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record type: " + type);
+        }
+
+        return records;
+    }
+
+    private List<RichCdcMultiplexRecord> extractRecordsFromDdl() {
+        String sql = root.get(FIELD_SQL).asText();
+        if (StringUtils.isEmpty(sql)) {
+            return Collections.emptyList();
+        }
+
+        LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
+        SQLStatement sqlStatement = SQLUtils.parseSingleStatement(sql, DbType.mysql);
+
+        if (sqlStatement instanceof SQLAlterTableStatement) {
+            SQLAlterTableStatement sqlAlterTableStatement = (SQLAlterTableStatement) sqlStatement;
+            for (SQLAlterTableItem sqlAlterTableItem : sqlAlterTableStatement.getItems()) {
+                extractFieldTypesFromAlterTableItem(sqlAlterTableItem, fieldTypes);
+            }
+        }
+
+        return Collections.singletonList(
+                new RichCdcMultiplexRecord(
+                        CdcRecord.emptyRecord(), fieldTypes, databaseName, tableName));
+    }
+
+    private void extractFieldTypesFromAlterTableItem(
+            SQLAlterTableItem sqlAlterTableItem, LinkedHashMap<String, DataType> fieldTypes) {
+        if (sqlAlterTableItem instanceof SQLAlterTableAddColumn) {
+            SQLAlterTableAddColumn sqlAlterTableAddColumn =
+                    (SQLAlterTableAddColumn) sqlAlterTableItem;
+            List<SQLColumnDefinition> columns = sqlAlterTableAddColumn.getColumns();
+
+            for (SQLColumnDefinition column : columns) {
+                fieldTypes.put(getColumnName(column), getPaimonDataType(column));
+            }
+        } else if (sqlAlterTableItem instanceof SQLAlterTableDropColumnItem) {
+            // ignore
+        } else if (sqlAlterTableItem instanceof MySqlAlterTableModifyColumn) {
+            MySqlAlterTableModifyColumn mySqlAlterTableModifyColumn =
+                    (MySqlAlterTableModifyColumn) sqlAlterTableItem;
+            SQLColumnDefinition newColumnDefinition =
+                    mySqlAlterTableModifyColumn.getNewColumnDefinition();
+
+            fieldTypes.put(
+                    getColumnName(newColumnDefinition), getPaimonDataType(newColumnDefinition));
+        } else if (sqlAlterTableItem instanceof MySqlAlterTableChangeColumn) {
+            MySqlAlterTableChangeColumn mySqlAlterTableChangeColumn =
+                    (MySqlAlterTableChangeColumn) sqlAlterTableItem;
+            SQLColumnDefinition newColumnDefinition =
+                    mySqlAlterTableChangeColumn.getNewColumnDefinition();
+
+            fieldTypes.put(
+                    getColumnName(newColumnDefinition), getPaimonDataType(newColumnDefinition));
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported ALTER TABLE type: "
+                            + sqlAlterTableItem.getClass().getSimpleName());
+        }
+    }
+
+    private String getColumnName(SQLColumnDefinition column) {
+        return toFieldName(StringUtils.replace(column.getColumnName(), "`", ""));
+    }
+
+    private DataType getPaimonDataType(SQLColumnDefinition column) {
+        return toPaimonDataType(
+                column.getDataType().toString(), !column.containsNotNullConstaint());
+    }
+
+    private String toFieldName(String rawName) {
+        return StringUtils.caseSensitiveConversion(rawName, caseSensitive);
+    }
+
+    private DataType toPaimonDataType(String mySqlType, boolean isNullable) {
+        return MySqlTypeUtils.toDataType(mySqlType).copy(isNullable);
+    }
+
+    private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+        LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
+
+        JsonNode schema = root.get(FIELD_MYSQL_TYPE);
+        Iterator<String> iterator = schema.fieldNames();
+        while (iterator.hasNext()) {
+            String fieldName = iterator.next();
+            String fieldType = schema.get(fieldName).asText();
+            fieldTypes.put(toFieldName(fieldName), fieldType);
+        }
+
+        return fieldTypes;
+    }
+
+    private Map<String, String> extractRow(JsonNode record, Map<String, String> mySqlFieldTypes) {
+        Map<String, Object> jsonMap =
+                objectMapper.convertValue(record, new TypeReference<Map<String, Object>>() {});
+        if (jsonMap == null) {
+            return new HashMap<>();
+        }
+
+        Map<String, String> resultMap = new HashMap<>();
+        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
+            String fieldName = field.getKey();
+            String mySqlType = field.getValue();
+            Object objectValue = jsonMap.get(fieldName);
+            if (objectValue == null) {
+                continue;
+            }
+
+            String oldValue = objectValue.toString();
+            String newValue = oldValue;
+
+            if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
+                newValue = CanalFieldParser.convertSet(newValue, mySqlType);
+            } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
+                newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
+            } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
+                try {
+                    byte[] wkb =
+                            CanalFieldParser.convertGeoType2WkbArray(
+                                    oldValue.getBytes(StandardCharsets.ISO_8859_1));
+                    newValue = MySqlTypeUtils.convertWkbArray(wkb);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(
+                            String.format("Failed to convert %s to geometry JSON.", oldValue), e);
+                }
+            }
+            resultMap.put(fieldName, newValue);
+        }
+
+        // generate values for computed columns
+        for (ComputedColumn computedColumn : computedColumns) {
+            resultMap.put(
+                    computedColumn.columnName(),
+                    computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
+        }
+
+        return resultMap;
+    }
+
+    private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
+        Map<String, String> keyCaseInsensitive = new HashMap<>();
+        for (Map.Entry<String, String> entry : origin.entrySet()) {
+            String fieldName = entry.getKey().toLowerCase();
+            checkArgument(
+                    !keyCaseInsensitive.containsKey(fieldName),
+                    "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s",
+                    origin);
+            keyCaseInsensitive.put(fieldName, entry.getValue());
+        }
+        return keyCaseInsensitive;
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index bba93eaa2..f8ec80618 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -122,12 +122,6 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
         return tableNameConverter.convert(tableName);
     }
 
-    @Override
-    public String parseDatabaseName() {
-        String databaseName = payload.get("source").get("db").asText();
-        return tableNameConverter.convert(databaseName);
-    }
-
     private boolean isSchemaChange() {
         return payload.get("op") == null;
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index c4b7ea7a9..363c5490e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.types.RowKind;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
@@ -39,6 +40,10 @@ public class CdcRecord implements Serializable {
         this.fields = fields;
     }
 
+    public static CdcRecord emptyRecord() {
+        return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
+    }
+
     public RowKind kind() {
         return kind;
     }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index 136ff32b7..e62864ee9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -42,10 +42,6 @@ public interface EventParser<T> {
         throw new UnsupportedOperationException("Table name is not supported in this parser.");
     }
 
-    default String parseDatabaseName() {
-        throw new UnsupportedOperationException("Database name is not supported in this parser.");
-    }
-
     /**
      * Parse new schema if this event contains schema change.
      *
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
new file mode 100644
index 000000000..c8d09f4d0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataType;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+
+/** Compared to {@link CdcMultiplexRecord}, this contains schema information. */
+public class RichCdcMultiplexRecord implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final CdcRecord cdcRecord;
+    private final LinkedHashMap<String, DataType> fieldTypes;
+    private final String databaseName;
+    private final String tableName;
+
+    public RichCdcMultiplexRecord(
+            CdcRecord cdcRecord,
+            LinkedHashMap<String, DataType> fieldTypes,
+            String databaseName,
+            String tableName) {
+        this.cdcRecord = cdcRecord;
+        this.fieldTypes = fieldTypes;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+    }
+
+    public String tableName() {
+        return tableName;
+    }
+
+    public RichCdcRecord toRichCdcRecord() {
+        return new RichCdcRecord(cdcRecord, fieldTypes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(cdcRecord, fieldTypes, databaseName, tableName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
+        return Objects.equals(cdcRecord, that.cdcRecord)
+                && Objects.equals(fieldTypes, that.fieldTypes)
+                && databaseName.equals(that.databaseName)
+                && tableName.equals(that.tableName);
+    }
+
+    @Override
+    public String toString() {
+        return "{"
+                + "cdcRecord="
+                + cdcRecord
+                + ", fieldTypes="
+                + fieldTypes
+                + ", databaseName="
+                + databaseName
+                + ", tableName="
+                + tableName
+                + '}';
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
similarity index 58%
copy from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 7288138bc..105f45be4 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -18,46 +18,40 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.ObjectUtils;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
 
-/** Testing {@link EventParser} for {@link TestCdcEvent}. */
-public class TestCdcEventParser implements EventParser<TestCdcEvent> {
+/** {@link EventParser} for {@link RichCdcMultiplexRecord}. */
+public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMultiplexRecord> {
 
-    private TestCdcEvent raw;
+    // TODO: currently we don't consider database
+    private String currentTable;
+    private RichEventParser currentParser;
+
+    private final Map<String, RichEventParser> parsers = new HashMap<>();
 
     @Override
-    public void setRawEvent(TestCdcEvent raw) {
-        this.raw = raw;
+    public void setRawEvent(RichCdcMultiplexRecord record) {
+        this.currentTable = record.tableName();
+        this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
+        currentParser.setRawEvent(record.toRichCdcRecord());
     }
 
     @Override
     public String parseTableName() {
-        return raw.tableName();
+        return currentTable;
     }
 
     @Override
     public List<DataField> parseSchemaChange() {
-        return ObjectUtils.coalesce(raw.updatedDataFields(), Collections.emptyList());
-    }
-
-    @Override
-    public String parseDatabaseName() {
-        return null;
+        return currentParser.parseSchemaChange();
     }
 
     @Override
     public List<CdcRecord> parseRecords() {
-        return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
-    }
-
-    @Override
-    public Optional<Schema> parseNewTable(String databaseName) {
-        return Optional.empty();
+        return currentParser.parseRecords();
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
deleted file mode 100644
index 3cc9cae08..000000000
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/** A {@link EventParser.Factory} for {@link RichCdcRecord}. */
-public class RichCdcParserFactory implements EventParser.Factory<RichCdcRecord> {
-
-    @Override
-    public RichEventParser create() {
-        return new RichEventParser();
-    }
-
-    private static class RichEventParser implements EventParser<RichCdcRecord> {
-
-        private RichCdcRecord record;
-
-        private final Map<String, DataType> previousDataFields = new HashMap<>();
-
-        @Override
-        public void setRawEvent(RichCdcRecord rawEvent) {
-            this.record = rawEvent;
-        }
-
-        @Override
-        public List<DataField> parseSchemaChange() {
-            List<DataField> change = new ArrayList<>();
-            record.fieldTypes()
-                    .forEach(
-                            (field, type) -> {
-                                DataType previous = previousDataFields.get(field);
-                                if (!Objects.equals(previous, type)) {
-                                    previousDataFields.put(field, type);
-                                    change.add(new DataField(0, field, type));
-                                }
-                            });
-            return change;
-        }
-
-        @Override
-        public List<CdcRecord> parseRecords() {
-            return Collections.singletonList(record.toCdcRecord());
-        }
-    }
-}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 7d02c8a60..37e9271d0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.RowKind;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -33,31 +34,28 @@ public class RichCdcRecord implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final RowKind kind;
-    private final Map<String, DataType> fieldTypes;
-    private final Map<String, String> fieldValues;
+    private final CdcRecord cdcRecord;
+    private final LinkedHashMap<String, DataType> fieldTypes;
 
-    public RichCdcRecord(
-            RowKind kind, Map<String, DataType> fieldTypes, Map<String, String> fieldValues) {
-        this.kind = kind;
+    public RichCdcRecord(CdcRecord cdcRecord, LinkedHashMap<String, DataType> fieldTypes) {
+        this.cdcRecord = cdcRecord;
         this.fieldTypes = fieldTypes;
-        this.fieldValues = fieldValues;
     }
 
-    public RowKind kind() {
-        return kind;
+    public boolean hasPayload() {
+        return !cdcRecord.fields().isEmpty();
     }
 
-    public Map<String, DataType> fieldTypes() {
-        return fieldTypes;
+    public RowKind kind() {
+        return cdcRecord.kind();
     }
 
-    public Map<String, String> fieldValues() {
-        return fieldValues;
+    public LinkedHashMap<String, DataType> fieldTypes() {
+        return fieldTypes;
     }
 
     public CdcRecord toCdcRecord() {
-        return new CdcRecord(kind, fieldValues);
+        return cdcRecord;
     }
 
     @Override
@@ -69,26 +67,17 @@ public class RichCdcRecord implements Serializable {
             return false;
         }
         RichCdcRecord that = (RichCdcRecord) o;
-        return kind == that.kind
-                && Objects.equals(fieldTypes, that.fieldTypes)
-                && Objects.equals(fieldValues, that.fieldValues);
+        return cdcRecord == that.cdcRecord && Objects.equals(fieldTypes, that.fieldTypes);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kind, fieldTypes, fieldValues);
+        return Objects.hash(cdcRecord, fieldTypes);
     }
 
     @Override
     public String toString() {
-        return "{"
-                + "kind="
-                + kind
-                + ", fieldTypes="
-                + fieldTypes
-                + ", fieldValues="
-                + fieldValues
-                + '}';
+        return "{" + "cdcRecord=" + cdcRecord + ", fieldTypes=" + fieldTypes + '}';
     }
 
     public static Builder builder(RowKind kind) {
@@ -99,7 +88,7 @@ public class RichCdcRecord implements Serializable {
     public static class Builder {
 
         private final RowKind kind;
-        private final Map<String, DataType> fieldTypes = new HashMap<>();
+        private final LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
         private final Map<String, String> fieldValues = new HashMap<>();
 
         public Builder(RowKind kind) {
@@ -113,7 +102,7 @@ public class RichCdcRecord implements Serializable {
         }
 
         public RichCdcRecord build() {
-            return new RichCdcRecord(kind, fieldTypes, fieldValues);
+            return new RichCdcRecord(new CdcRecord(kind, fieldValues), fieldTypes);
         }
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 5648b695a..774895757 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -68,7 +68,7 @@ public class RichCdcSinkBuilder {
         CdcSinkBuilder<RichCdcRecord> builder = new CdcSinkBuilder<>();
         return builder.withTable(table)
                 .withInput(input)
-                .withParserFactory(new RichCdcParserFactory())
+                .withParserFactory(RichEventParser::new)
                 .withParallelism(parallelism)
                 .withIdentifier(identifier)
                 .withCatalogLoader(catalogLoader)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
new file mode 100644
index 000000000..c98ead49f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+
+/** A {@link EventParser} for {@link RichCdcRecord}. */
+public class RichEventParser implements EventParser<RichCdcRecord> {
+
+    private RichCdcRecord record;
+
+    private final LinkedHashMap<String, DataType> previousDataFields = new LinkedHashMap<>();
+
+    @Override
+    public void setRawEvent(RichCdcRecord rawEvent) {
+        this.record = rawEvent;
+    }
+
+    @Override
+    public List<DataField> parseSchemaChange() {
+        List<DataField> change = new ArrayList<>();
+        record.fieldTypes()
+                .forEach(
+                        (field, type) -> {
+                            DataType previous = previousDataFields.get(field);
+                            if (!Objects.equals(previous, type)) {
+                                previousDataFields.put(field, type);
+                                change.add(new DataField(0, field, type));
+                            }
+                        });
+        return change;
+    }
+
+    @Override
+    public List<CdcRecord> parseRecords() {
+        if (record.hasPayload()) {
+            return Collections.singletonList(record.toCdcRecord());
+        } else {
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index fd7569acf..1472021b0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -57,10 +57,21 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
     @Test
     @Timeout(60)
     public void testSchemaEvolution() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolution");
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionWithMissingDdl() throws Exception {
+        runSingleTableSchemaEvolution("schemaevolutionmissingddl");
+    }
+
+    private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
         final String topic = "schema_evolution";
         createTestTopic(topic, 1, 1);
         // ---------- Write the Canal json into Kafka -------------------
-        List<String> lines = readLines("kafka.canal/table/schemaevolution/canal-data-1.txt");
+        List<String> lines =
+                readLines(String.format("kafka.canal/table/%s/canal-data-1.txt", sourceDir));
         try {
             writeRecordsToKafka(topic, lines);
         } catch (Exception e) {
@@ -95,10 +106,10 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
 
         waitJobRunning(client);
 
-        testSchemaEvolutionImpl(topic);
+        testSchemaEvolutionImpl(topic, sourceDir);
     }
 
-    private void testSchemaEvolutionImpl(String topic) throws Exception {
+    private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
         FileStoreTable table = getFileStoreTable();
 
         RowType rowType =
@@ -115,7 +126,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
 
         try {
             writeRecordsToKafka(
-                    topic, readLines("kafka.canal/table/schemaevolution/canal-data-2.txt"));
+                    topic,
+                    readLines(String.format("kafka.canal/table/%s/canal-data-2.txt", sourceDir)));
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
@@ -140,7 +152,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
 
         try {
             writeRecordsToKafka(
-                    topic, readLines("kafka.canal/table/schemaevolution/canal-data-3.txt"));
+                    topic,
+                    readLines(String.format("kafka.canal/table/%s/canal-data-3.txt", sourceDir)));
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
@@ -166,7 +179,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
 
         try {
             writeRecordsToKafka(
-                    topic, readLines("kafka.canal/table/schemaevolution/canal-data-4.txt"));
+                    topic,
+                    readLines(String.format("kafka.canal/table/%s/canal-data-4.txt", sourceDir)));
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
@@ -196,7 +210,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
 
         try {
             writeRecordsToKafka(
-                    topic, readLines("kafka.canal/table/schemaevolution/canal-data-5.txt"));
+                    topic,
+                    readLines(String.format("kafka.canal/table/%s/canal-data-5.txt", sourceDir)));
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java
deleted file mode 100644
index 168f92641..000000000
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.kafka;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericArray;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser;
-import org.apache.paimon.flink.action.cdc.mysql.Expression;
-import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/** Tests for {@link CanalJsonEventParser}. */
-public class KafkaEventParserTest {
-    private static final String CANAL_JSON_EVENT =
-            "{\"data\":[{\"pt\":\"1\",\"_ID\":\"1\",\"v1\":\"one\","
-                    + "\"_geometrycollection\":\"\\u0000\\u0000\\u0000\\u0000\\u0001\\u0007\\u0000\\u0000\\u0000\\u0003"
-                    + "\\u0000\\u0000\\u0000\\u0001\\u0001\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"
-                    + "\\u0000$@\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000$@\\u0001\\u0001\\u0000\\u0000\\u0000\\u0000"
-                    + "\\u0000\\u0000\\u0000\\u0000\\u0000>@\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000>@\\u0001\\u0002"
-                    + "\\u0000\\u0000\\u0000\\u0002\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"
-                    + ".@\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"
-                    + ".@\\u0000\\u0000\\u0000\\u0000\\u0000\\u00004@\\u0000\\u0000\\u0000\\u0000\\u0000\\u00004@\","
-                    + "\"_set\":\"3\",\"_enum\":\"1\"}],\"database\":\"paimon_sync_table\",\"es\":1683006706000,"
-                    + "\"id\":92,\"isDdl\":false,\"mysqlType\":{\"pt\":\"INT\",\"_ID\":\"INT\",\"v1\":\"VARCHAR(10)\","
-                    + "\"_geometrycollection\":\"GEOMETRYCOLLECTION\",\"_set\":\"SET('a','b','c','d')\",\"_enum\":\"ENUM"
-                    + "('value1','value2','value3')\"},\"old\":null,\"pkNames\":[\"_id\"],\"sql\":\"\","
-                    + "\"sqlType\":{\"pt\":4,\"_id\":4,\"v1\":12},\"table\":\"schema_evolution_1\",\"ts\":1683006706728,"
-                    + "\"type\":\"INSERT\"}\n"
-                    + ":null,\"database\":\"paimon_sync_table\",\"es\":1683168078000,\"id\":40,\"isDdl\":true,"
-                    + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\"/* Query from "
-                    + "DMS-WEBSQL-0-Qid_18293315143325386Y by user 1486767996652600 */ ALTER TABLE schema_evolution_1 "
-                    + "MODIFY COLUMN v2 BIGINT\",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168078956,"
-                    + "\"type\":\"ALTER\"}";
-    private static final String CANAL_JSON_DDL_ADD_EVENT =
-            "{\"data\":null,\"database\":\"paimon_sync_table\","
-                    + "\"es\":1683008289000,\"id\":13,\"isDdl\":true,\"mysqlType\":null,\"old\":null,\"pkNames\":null,"
-                    + "\"sql\":\" ALTER TABLE "
-                    + "schema_evolution_1 ADD COLUMN v2 INT\",\"sqlType\":null,\"table\":\"schema_evolution_1\","
-                    + "\"ts\":1683008289401,\"type\":\"ALTER\"}";
-
-    private static final String CANAL_JSON_DDL_MODIFY_EVENT =
-            "{\"data\":null,\"database\":\"paimon_sync_table\",\"es\":1683168155000,\"id\":54,\"isDdl\":true,"
-                    + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\" ALTER TABLE schema_evolution_1 MODIFY "
-                    + "COLUMN v1 VARCHAR(20)\",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168154943,"
-                    + "\"type\":\"ALTER\"}";
-
-    private static final String CANAL_JSON_DDL_DROP_EVENT =
-            "{\"data\":null,\"database\":\"paimon_sync_table\",\"es\":1683168155000,\"id\":54,\"isDdl\":true,"
-                    + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\" ALTER TABLE schema_evolution_1 DROP "
-                    + "COLUMN v1 \",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168154943,"
-                    + "\"type\":\"ALTER\"}";
-
-    private static final String CANAL_JSON_DDL_CHANGE_EVENT =
-            "{\"data\":null,\"database\":\"paimon_sync_table\",\"es\":1683168155000,\"id\":54,\"isDdl\":true,"
-                    + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\" ALTER TABLE schema_evolution_1 CHANGE "
-                    + "COLUMN `$% ^,& *(` cg VARCHAR(20) \",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168154943,"
-                    + "\"type\":\"ALTER\"}";
-
-    private static final String CANAL_JSON_DDL_MULTI_ADD_EVENT =
-            "{\"data\":null,\"database\":\"paimon_sync_table\","
-                    + "\"es\":1683614798000,\"id\":2431,\"isDdl\":true,\"mysqlType\":null,\"old\":null,\"pkNames\":null,"
-                    + "\"sql\":\" ALTER TABLE "
-                    + "schema_evolution_multiple \\nADD v4 INT,\\nMODIFY COLUMN v1 VARCHAR(30),\\nADD COLUMN (v5 DOUBLE, v6 "
-                    + "DECIMAL(5, 3), `$% ^,& *(` VARCHAR(10) COMMENT 'test'),\\n            MODIFY v2 BIGINT\",\"sqlType\":null,"
-                    + "\"table\":\"schema_evolution_multiple\",\"ts\":1683614799031,\"type\":\"ALTER\"}\n";
-
-    private static final String DEBEZIUM_JSON_EVENT =
-            "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\","
-                    + "\"description\":\"Small 2-wheel scooter\",\"weight\":3.140000104904175},\"source\":{\"version\":\"1.1.1"
-                    + ".Final\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":0,\"snapshot\":\"true\","
-                    + "\"db\":\"inventory\",\"table\":\"products\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\","
-                    + "\"pos\":154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1589355606100,"
-                    + "\"transaction\":null}\n";
-
-    @Test
-    public void testCanalJsonEventParser() {
-        boolean caseSensitive = false;
-        EventParser<String> parser =
-                new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive));
-        parser.setRawEvent(CANAL_JSON_EVENT);
-        List<DataField> dataFields = new ArrayList<>();
-        dataFields.add(new DataField(0, "pt", DataTypes.INT()));
-        dataFields.add(new DataField(1, "_id", DataTypes.INT()));
-        dataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
-        dataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING()));
-        dataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING())));
-        dataFields.add(new DataField(5, "_enum", DataTypes.STRING()));
-        assertThat(parser.parseSchemaChange()).isEmpty();
-        List<GenericRow> result =
-                parser.parseRecords().stream()
-                        .map(record -> toGenericRow(record, dataFields).get())
-                        .collect(Collectors.toList());
-        BinaryString[] binaryStrings =
-                new BinaryString[] {BinaryString.fromString("a"), BinaryString.fromString("b")};
-        GenericArray genericArray = new GenericArray(binaryStrings);
-
-        List<GenericRow> expect =
-                Collections.singletonList(
-                        GenericRow.of(
-                                1,
-                                1,
-                                BinaryString.fromString("one"),
-                                BinaryString.fromString(
-                                        "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"),
-                                genericArray,
-                                BinaryString.fromString("value1")));
-        assertThat(result).isEqualTo(expect);
-    }
-
-    @Test
-    public void testCanalJsonEventParserDdl() {
-        boolean caseSensitive = false;
-        EventParser<String> parser =
-                new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive));
-        parser.setRawEvent(CANAL_JSON_EVENT);
-        List<DataField> expectDataFields = new ArrayList<>();
-        expectDataFields.add(new DataField(0, "v2", DataTypes.INT()));
-        assertThat(parser.parseSchemaChange()).isEmpty();
-        parser.setRawEvent(CANAL_JSON_DDL_ADD_EVENT);
-        List<DataField> updatedDataFieldsAdd = parser.parseSchemaChange();
-        assertThat(updatedDataFieldsAdd).isEqualTo(expectDataFields);
-
-        expectDataFields.clear();
-        expectDataFields.add(new DataField(0, "v1", DataTypes.VARCHAR(20)));
-        parser.setRawEvent(CANAL_JSON_DDL_MODIFY_EVENT);
-        List<DataField> updatedDataFieldsModify = parser.parseSchemaChange();
-        assertThat(updatedDataFieldsModify).isEqualTo(expectDataFields);
-        expectDataFields.clear();
-
-        expectDataFields.add(new DataField(0, "v4", DataTypes.INT()));
-        expectDataFields.add(new DataField(1, "v1", DataTypes.VARCHAR(30)));
-
-        expectDataFields.add(new DataField(2, "v5", DataTypes.DOUBLE()));
-        expectDataFields.add(new DataField(3, "v6", DataTypes.DECIMAL(5, 3)));
-        expectDataFields.add(new DataField(4, "$% ^,& *(", DataTypes.VARCHAR(10)));
-        expectDataFields.add(new DataField(5, "v2", DataTypes.BIGINT()));
-
-        parser.setRawEvent(CANAL_JSON_DDL_MULTI_ADD_EVENT);
-        List<DataField> updatedDataFieldsMulti = parser.parseSchemaChange();
-        assertThat(updatedDataFieldsMulti).isEqualTo(expectDataFields);
-        expectDataFields.clear();
-        parser.setRawEvent(CANAL_JSON_DDL_CHANGE_EVENT);
-        List<DataField> updatedDataFieldsChange = parser.parseSchemaChange();
-        expectDataFields.add(new DataField(0, "cg", DataTypes.VARCHAR(20)));
-        assertThat(updatedDataFieldsChange).isEqualTo(expectDataFields);
-    }
-
-    @Test
-    public void testCanalJsonEventParserParseDebeziumJson() {
-        boolean caseSensitive = true;
-        EventParser<String> parser =
-                new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive));
-        parser.setRawEvent(DEBEZIUM_JSON_EVENT);
-        RuntimeException e =
-                assertThrows(
-                        RuntimeException.class, parser::parseRecords, "Expecting RuntimeException");
-        assertThat(e)
-                .hasMessage(
-                        "CanalJsonEventParser only supports canal-json format,please make sure that your topic's format is accurate.");
-    }
-
-    @Test
-    public void testCaseSensitive() {
-        EventParser<String> parserCaseInsensitive =
-                new CanalJsonEventParser(false, new TableNameConverter(false));
-        EventParser<String> parserCaseSensitive =
-                new CanalJsonEventParser(true, new TableNameConverter(true));
-        parserCaseInsensitive.setRawEvent(CANAL_JSON_EVENT);
-        List<DataField> dataFields = new ArrayList<>();
-        dataFields.add(new DataField(0, "pt", DataTypes.INT()));
-        dataFields.add(new DataField(1, "_id", DataTypes.INT()));
-        dataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
-        dataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING()));
-        dataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING())));
-        dataFields.add(new DataField(5, "_enum", DataTypes.STRING()));
-        assertThat(parserCaseInsensitive.parseSchemaChange()).isEmpty();
-        List<GenericRow> result =
-                parserCaseInsensitive.parseRecords().stream()
-                        .map(record -> toGenericRow(record, dataFields).get())
-                        .collect(Collectors.toList());
-        BinaryString[] binaryStrings =
-                new BinaryString[] {BinaryString.fromString("a"), BinaryString.fromString("b")};
-        GenericArray genericArray = new GenericArray(binaryStrings);
-
-        List<GenericRow> expect =
-                Collections.singletonList(
-                        GenericRow.of(
-                                1,
-                                1,
-                                BinaryString.fromString("one"),
-                                BinaryString.fromString(
-                                        "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"),
-                                genericArray,
-                                BinaryString.fromString("value1")));
-        assertThat(result).isEqualTo(expect);
-        parserCaseSensitive.setRawEvent(CANAL_JSON_EVENT);
-        Optional<GenericRow> genericRow =
-                toGenericRow(parserCaseSensitive.parseRecords().get(0), dataFields);
-        assert !genericRow.isPresent();
-        dataFields.remove(1);
-        dataFields.add(1, new DataField(1, "_ID", DataTypes.INT()));
-        List<GenericRow> resultCaseSensitive =
-                parserCaseSensitive.parseRecords().stream()
-                        .map(record -> toGenericRow(record, dataFields).get())
-                        .collect(Collectors.toList());
-        assertThat(resultCaseSensitive).isEqualTo(expect);
-    }
-
-    @Test
-    public void testCanalJsonEventParserAndComputeColumn() {
-        boolean caseSensitive = false;
-        EventParser<String> parser =
-                new CanalJsonEventParser(
-                        caseSensitive,
-                        new TableNameConverter(caseSensitive),
-                        Collections.singletonList(
-                                new ComputedColumn("v1", Expression.substring("v1", "1"))));
-        parser.setRawEvent(CANAL_JSON_EVENT);
-        List<DataField> dataFields = new ArrayList<>();
-        dataFields.add(new DataField(0, "pt", DataTypes.INT()));
-        dataFields.add(new DataField(1, "_id", DataTypes.INT()));
-        dataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
-        dataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING()));
-        dataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING())));
-        dataFields.add(new DataField(5, "_enum", DataTypes.STRING()));
-        assertThat(parser.parseSchemaChange()).isEmpty();
-        List<GenericRow> result =
-                parser.parseRecords().stream()
-                        .map(record -> toGenericRow(record, dataFields).get())
-                        .collect(Collectors.toList());
-        BinaryString[] binaryStrings =
-                new BinaryString[] {BinaryString.fromString("a"), BinaryString.fromString("b")};
-        GenericArray genericArray = new GenericArray(binaryStrings);
-
-        List<GenericRow> expect =
-                Collections.singletonList(
-                        GenericRow.of(
-                                1,
-                                1,
-                                BinaryString.fromString("ne"),
-                                BinaryString.fromString(
-                                        "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"),
-                                genericArray,
-                                BinaryString.fromString("value1")));
-        assertThat(result).isEqualTo(expect);
-    }
-}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 7288138bc..e39a74d5c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -46,11 +46,6 @@ public class TestCdcEventParser implements EventParser<TestCdcEvent> {
         return ObjectUtils.coalesce(raw.updatedDataFields(), Collections.emptyList());
     }
 
-    @Override
-    public String parseDatabaseName() {
-        return null;
-    }
-
     @Override
     public List<CdcRecord> parseRecords() {
         return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-1.txt
new file mode 100644
index 000000000..8b6d5d6c4
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_1","ts":1683006706728,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"2","v1":"two"},{"pt":"2","_id":"4","v1":"four"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_2","ts":1683006724404,"type":"INSERT"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-2.txt
new file mode 100644
index 000000000..842d5a37d
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-2.txt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"2","_id":"3","v1":"three","v2":"30"},{"pt":"1","_id":"5","v1":"five","v2":"50"}],"database":"paimon_sync_table","es":1683008289000,"id":14,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_1","ts":1683008289719,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"6","v1":"six","v2":"60"}],"database":"paimon_sync_table","es":1683008290000,"id":16,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_2","ts":1683008290391,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"2","v1":"second","v2":null}],"database":"paimon_sync_table","es":1683008290000,"id":18,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":[{"v1":"two"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_2","ts":1683008290626,"type":"UPDATE"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-3.txt
new file mode 100644
index 000000000..fc9547843
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"2","_id":"7","v1":"seven","v2":"70000000000"}],"database":"paimon_sync_table","es":1683168079000,"id":41,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079071,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"5","v1":"five","v2":"50"}],"database":"paimon_sync_table","es":1683168079000,"id":42,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079391,"type":"DELETE"}
+{"data":[{"pt":"2","_id":"3","v1":"three","v2":"30000000000"}],"database":"paimon_sync_table","es":1683168079000,"id":43,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":[{"v2":"30"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079507,"type":"UPDATE"}
+{"data":[{"pt":"2","_id":"8","v1":"eight","v2":"80000000000"}],"database":"paimon_sync_table","es":1683168080000,"id":45,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_2","ts":1683168079943,"type":"INSERT"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-4.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-4.txt
new file mode 100644
index 000000000..45e4137e5
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-4.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"9","v1":"nine","v2":"90000000000","v3":"99999.999","v4":"nine.bin","v5":"9.9"}],"database":"paimon_sync_table","es":1683168155000,"id":55,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(10)","v5":"FLOAT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":7},"table":"schema_evolution_1","ts":1683168155270,"type":"INSERT"}
+{"data":[{"pt":"2","_id":"8","v1":"very long string","v2":"80000000000","v3":null,"v4":null,"v5":null}],"database":"paimon_sync_table","es":1683168156000,"id":60,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(10)","v5":"FLOAT"},"old":[{"v1":"eight"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":-3,"v5":7},"table":"schema_evolution_2","ts":1683168156456,"type":"UPDATE"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-5.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-5.txt
new file mode 100644
index 000000000..bb262c876
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-5.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"9","v1":"nine","v2":"90000000000","v3":"99999.999","v4":"nine.bin.long","v5":"9.00000000009"}],"database":"paimon_sync_table","es":1683357620000,"id":4710,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(20)","v5":"DOUBLE"},"old":[{"v4":"nine.bin","v5":"9.899999618530273"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":8},"table":"schema_evolution_1 [...]
+{"data":[{"pt":"2","_id":"4","v1":"four","v2":null,"v3":null,"v4":"four.bin.long","v5":"4.00000000004"}],"database":"paimon_sync_table","es":1683357620000,"id":4710,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(20)","v5":"DOUBLE"},"old":[{"v4":null,"v5":null}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":8},"table":"schema_evolution_2","ts":1683357621014,"type":"UPDATE"}