You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/17 09:56:06 UTC

[incubator-paimon] branch master updated: [flink][kafka-cdc] Kafka cdc should dynamically create table (#1564)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3750a4214 [flink][kafka-cdc] Kafka cdc should dynamically create table (#1564)
3750a4214 is described below

commit 3750a42143c2a5eb61c75738272dac17a74d319f
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Mon Jul 17 17:56:01 2023 +0800

    [flink][kafka-cdc] Kafka cdc should dynamically create table (#1564)
---
 docs/content/how-to/cdc-ingestion.md               |  10 +-
 .../shortcodes/generated/mysql_sync_database.html  |   4 +
 ...DatabaseSyncMode.java => DatabaseSyncMode.java} |  13 +-
 .../paimon/flink/action/cdc/kafka/KafkaSchema.java | 115 ++-----------
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  | 179 +++------------------
 .../cdc/kafka/KafkaSyncDatabaseActionFactory.java  |  20 ---
 .../action/cdc/kafka/KafkaSyncTableAction.java     |  10 +-
 .../action/cdc/kafka/canal/CanalRecordParser.java  |  79 +++++++--
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 100 +++---------
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java |  11 --
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  27 ++--
 .../cdc/mysql/MySqlSyncDatabaseActionFactory.java  |  33 +++-
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  | 139 ++++++++++++++++
 .../cdc/CdcDynamicTableParsingProcessFunction.java |   2 +-
 .../apache/paimon/flink/sink/cdc/EventParser.java  |   3 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |  18 +--
 .../cdc/NewTableSchemaBuilder.java}                |  21 ++-
 .../flink/sink/cdc/RichCdcMultiplexRecord.java     |  47 ++++--
 .../cdc/RichCdcMultiplexRecordEventParser.java     | 104 +++++++++++-
 .../cdc/RichCdcMultiplexRecordSchemaBuilder.java}  |  40 ++---
 .../action/cdc/kafka/KafkaActionITCaseBase.java    |  14 ++
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |  60 +++----
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  44 +++--
 .../cdc/mysql/MySqlSyncDatabaseActionITCase.java   |  10 +-
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |   7 -
 25 files changed, 545 insertions(+), 565 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md
index 3533d9906..9c49ea285 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -113,6 +113,7 @@ To use this feature through `flink run`, run the following shell command.
     [--table-suffix <paimon-table-suffix>] \
     [--including-tables <mysql-table-name|name-regular-expr>] \
     [--excluding-tables <mysql-table-name|name-regular-expr>] \
+    [--mode <sync-mode>] \
     [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
     [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
     [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
@@ -289,8 +290,6 @@ To use this feature through `flink run`, run the following shell command.
     kafka-sync-database
     --warehouse <warehouse-path> \
     --database <database-name> \
-    [--schema-init-max-read <int>] \
-    [--ignore-incompatible <true/false>] \
     [--table-prefix <paimon-table-prefix>] \
     [--table-suffix <paimon-table-suffix>] \
     [--including-tables <table-name|name-regular-expr>] \
@@ -304,8 +303,10 @@ To use this feature through `flink run`, run the following shell command.
 
 Only tables with primary keys will be synchronized.
 
-For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table.
-Its schema will be derived from all specified Kafka topic's tables,it gets the earliest non-DDL data parsing schema from topic. If the Paimon table already exists, its schema will be compared against the schema of all specified Kafka topic's tables.
+This action will build a single combined sink for all tables. For each Kafka topic's table to be synchronized, if the 
+corresponding Paimon table does not exist, this action will automatically create the table, and its schema will be derived 
+from all specified Kafka topic's tables. If the Paimon table already exists and its schema is different from that parsed 
+from Kafka record, this action will try to preform schema evolution.
 
 Example
 
@@ -317,7 +318,6 @@ Synchronization from one Kafka topic to Paimon database.
     kafka-sync-database \
     --warehouse hdfs:///path/to/warehouse \
     --database test_db \
-    --schema-init-max-read 500 \
     --kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \
     --kafka-conf topic=order \
     --kafka-conf properties.group.id=123456 \
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index 1c1dc789c..cf6ededc0 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -53,6 +53,10 @@ under the License.
         <td><h5>--excluding-tables</h5></td>
         <td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including-tables". "--excluding-tables" has higher priority than "--including-tables" if you specified both.</td>
     </tr>
+    <tr>
+        <td><h5>--mode</h5></td>
+        <td>It is used to specify synchronization mode.<br />Possible values:<ul><li>"divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.</li><li>"combined": start a single combined sink for all tables, the new table will be automatically synchronized.</li></ul></td>
+    </tr>
     <tr>
         <td><h5>--mysql-conf</h5></td>
         <td>The configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a> for a complete list of configurations.</td>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
similarity index 72%
copy from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
index a7eccba66..8e4641ccd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.action.cdc;
 
 import java.io.Serializable;
 
 /**
  * There are two modes for database sync.
  *
- * <p>1) SEPARATE mode, start a sink for each table, the synchronization of the new table requires
+ * <p>1) DIVIDED mode, start a sink for each table, the synchronization of the new table requires
  * restarting the job.
  *
- * <p>2) UNIFIED mode, start a unified sink, the new table will be automatically synchronized.
+ * <p>2) COMBINED mode, start a single combined sink for all tables, the new table will be
+ * automatically synchronized.
  */
-public enum MySqlDatabaseSyncMode implements Serializable {
-    SEPARATE,
-    UNIFIED
+public enum DatabaseSyncMode implements Serializable {
+    DIVIDED,
+    COMBINED
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 3fc4015ba..deda133de 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -18,15 +18,9 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
 import org.apache.paimon.types.DataType;
 
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.apache.commons.compress.utils.Lists;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -36,10 +30,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -50,7 +41,6 @@ import java.util.UUID;
 public class KafkaSchema {
 
     private static final int MAX_RETRY = 100;
-    private static final int MAX_READ = 1000;
 
     private final String databaseName;
     private final String tableName;
@@ -96,45 +86,7 @@ public class KafkaSchema {
                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-        return consumer;
-    }
-
-    private static Boolean extractIsDDL(JsonNode record) {
-        return Boolean.valueOf(extractJsonNode(record, "isDdl"));
-    }
-
-    private static String extractJsonNode(JsonNode record, String key) {
-        return record != null && record.get(key) != null ? record.get(key).asText() : null;
-    }
-
-    private static KafkaSchema parseCanalJson(String record, ObjectMapper objectMapper)
-            throws JsonProcessingException {
-        String databaseName;
-        String tableName;
-        final Map<String, DataType> fields = new LinkedHashMap<>();
-        final List<String> primaryKeys = new ArrayList<>();
-        JsonNode root = objectMapper.readValue(record, JsonNode.class);
-        if (!extractIsDDL(root)) {
-            JsonNode mysqlType = root.get("mysqlType");
-            Iterator<String> iterator = mysqlType.fieldNames();
-            while (iterator.hasNext()) {
-                String fieldName = iterator.next();
-                String fieldType = mysqlType.get(fieldName).asText();
-                String type = MySqlTypeUtils.getShortType(fieldType);
-                int precision = MySqlTypeUtils.getPrecision(fieldType);
-                int scale = MySqlTypeUtils.getScale(fieldType);
-                fields.put(fieldName, MySqlTypeUtils.toDataType(type, precision, scale));
-            }
-            ArrayNode pkNames = (ArrayNode) root.get("pkNames");
-            for (int i = 0; i < pkNames.size(); i++) {
-                primaryKeys.add(pkNames.get(i).asText());
-            }
-            databaseName = extractJsonNode(root, "database");
-            tableName = extractJsonNode(root, "table");
-            return new KafkaSchema(databaseName, tableName, fields, primaryKeys);
-        }
-        return null;
+        return new KafkaConsumer<>(props);
     }
 
     public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String topic)
@@ -142,25 +94,20 @@ public class KafkaSchema {
         KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);
 
         consumer.subscribe(Collections.singletonList(topic));
-        KafkaSchema kafkaSchema;
         int retry = 0;
-        ObjectMapper objectMapper = new ObjectMapper();
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
             for (ConsumerRecord<String, String> record : records) {
-                try {
-                    String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
-                    if ("canal-json".equals(format)) {
-                        kafkaSchema = parseCanalJson(record.value(), objectMapper);
-                        if (kafkaSchema != null) {
-                            return kafkaSchema;
-                        }
-                    } else {
-                        throw new UnsupportedOperationException(
-                                "This format: " + format + " is not support.");
+                String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
+                if ("canal-json".equals(format)) {
+                    CanalRecordParser parser = new CanalRecordParser(true, Collections.emptyList());
+                    KafkaSchema kafkaSchema = parser.getKafkaSchema(record.value());
+                    if (kafkaSchema != null) {
+                        return kafkaSchema;
                     }
-                } catch (JsonProcessingException e) {
-                    throw new RuntimeException(e);
+                } else {
+                    throw new UnsupportedOperationException(
+                            "This format: " + format + " is not support.");
                 }
             }
             if (retry == MAX_RETRY) {
@@ -171,46 +118,6 @@ public class KafkaSchema {
         }
     }
 
-    public static List<KafkaSchema> getListKafkaSchema(
-            Configuration kafkaConfig, String topic, int maxRead) throws Exception {
-        KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);
-
-        consumer.subscribe(Collections.singletonList(topic));
-        List<KafkaSchema> kafkaSchemaList = Lists.newArrayList();
-        int retry = 0;
-        int read = 0;
-        maxRead = maxRead == 0 ? MAX_READ : maxRead;
-        ObjectMapper objectMapper = new ObjectMapper();
-        while (maxRead > read) {
-            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
-            for (ConsumerRecord<String, String> record : records) {
-                read++;
-                try {
-                    String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
-                    if ("canal-json".equals(format)) {
-                        KafkaSchema kafkaSchema = parseCanalJson(record.value(), objectMapper);
-                        if (kafkaSchema != null && !kafkaSchemaList.contains(kafkaSchema)) {
-                            kafkaSchemaList.add(kafkaSchema);
-                        }
-                    } else {
-                        throw new UnsupportedOperationException(
-                                "This format: " + format + " is not support.");
-                    }
-                } catch (JsonProcessingException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-            if (kafkaSchemaList.isEmpty() && retry == MAX_RETRY) {
-                throw new Exception("Could not get metadata from server,topic :" + topic);
-            } else if (!kafkaSchemaList.isEmpty() && retry == MAX_RETRY) {
-                break;
-            }
-            Thread.sleep(100);
-            retry++;
-        }
-        return kafkaSchemaList;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 07b20c523..b4894a31d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -18,21 +18,17 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordSchemaBuilder;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
@@ -40,19 +36,11 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
 import org.apache.flink.util.CollectionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -63,10 +51,10 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
  * href="https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/table/kafka/">document
  * of flink-connectors</a> for detailed keys and values.
  *
- * <p>For each topic's table to be synchronized, if the corresponding Paimon table does not exist,
- * this action will automatically create the table. Its schema will be derived from all specified
- * tables. If the Paimon table already exists, its schema will be compared against the schema of all
- * specified tables.
+ * <p>For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not
+ * exist, this action will automatically create the table, and its schema will be derived from all
+ * specified Kafka topic's tables. If the Paimon table already exists and its schema is different
+ * from that parsed from Kafka record, this action will try to preform schema evolution.
  *
  * <p>This action supports a limited number of schema changes. Currently, the framework can not drop
  * columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently
@@ -88,18 +76,13 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
  *       are supported.
  * </ul>
  *
- * <p>This action creates a Paimon table sink for each Paimon table to be written, so this action is
- * not very efficient in resource saving. We may optimize this action by merging all sinks into one
- * instance in the future.
+ * <p>To automatically synchronize new table, This action creates a single sink for all Paimon
+ * tables to be written. See {@link DatabaseSyncMode#COMBINED}.
  */
 public class KafkaSyncDatabaseAction extends ActionBase {
 
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaSyncDatabaseAction.class);
-
     private final Configuration kafkaConfig;
     private final String database;
-    private final int schemaInitMaxRead;
-    private final boolean ignoreIncompatible;
     private final String tablePrefix;
     private final String tableSuffix;
     @Nullable private final Pattern includingPattern;
@@ -110,29 +93,15 @@ public class KafkaSyncDatabaseAction extends ActionBase {
             Map<String, String> kafkaConfig,
             String warehouse,
             String database,
-            boolean ignoreIncompatible,
             Map<String, String> catalogConfig,
             Map<String, String> tableConfig) {
-        this(
-                kafkaConfig,
-                warehouse,
-                database,
-                0,
-                ignoreIncompatible,
-                null,
-                null,
-                null,
-                null,
-                catalogConfig,
-                tableConfig);
+        this(kafkaConfig, warehouse, database, null, null, null, null, catalogConfig, tableConfig);
     }
 
     KafkaSyncDatabaseAction(
             Map<String, String> kafkaConfig,
             String warehouse,
             String database,
-            int schemaInitMaxRead,
-            boolean ignoreIncompatible,
             @Nullable String tablePrefix,
             @Nullable String tableSuffix,
             @Nullable String includingTables,
@@ -142,8 +111,6 @@ public class KafkaSyncDatabaseAction extends ActionBase {
         super(warehouse, catalogConfig);
         this.kafkaConfig = Configuration.fromMap(kafkaConfig);
         this.database = database;
-        this.schemaInitMaxRead = schemaInitMaxRead;
-        this.ignoreIncompatible = ignoreIncompatible;
         this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
         this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
         this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
@@ -165,59 +132,23 @@ public class KafkaSyncDatabaseAction extends ActionBase {
             validateCaseInsensitive();
         }
 
-        Map<String, List<KafkaSchema>> kafkaCanalSchemaMap = getKafkaCanalSchemaMap();
-
         catalog.createDatabase(database, true);
         TableNameConverter tableNameConverter =
                 new TableNameConverter(caseSensitive, tablePrefix, tableSuffix);
 
-        List<FileStoreTable> fileStoreTables = new ArrayList<>();
-        List<String> monitoredTopics = new ArrayList<>();
-        for (Map.Entry<String, List<KafkaSchema>> kafkaCanalSchemaEntry :
-                kafkaCanalSchemaMap.entrySet()) {
-            List<KafkaSchema> kafkaSchemaList = kafkaCanalSchemaEntry.getValue();
-            String topic = kafkaCanalSchemaEntry.getKey();
-            for (KafkaSchema kafkaSchema : kafkaSchemaList) {
-                String paimonTableName = tableNameConverter.convert(kafkaSchema.tableName());
-                Identifier identifier = new Identifier(database, paimonTableName);
-                FileStoreTable table;
-                Schema fromCanal =
-                        KafkaActionUtils.buildPaimonSchema(
-                                kafkaSchema,
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                tableConfig,
-                                caseSensitive);
-                try {
-                    table = (FileStoreTable) catalog.getTable(identifier);
-                    Supplier<String> errMsg =
-                            incompatibleMessage(table.schema(), kafkaSchema, identifier);
-                    if (shouldMonitorTable(table.schema(), fromCanal, errMsg)) {
-                        monitoredTopics.add(topic);
-                        fileStoreTables.add(table);
-                    }
-                } catch (Catalog.TableNotExistException e) {
-                    catalog.createTable(identifier, fromCanal, false);
-                    table = (FileStoreTable) catalog.getTable(identifier);
-                    monitoredTopics.add(topic);
-                    fileStoreTables.add(table);
-                }
-            }
-        }
-        monitoredTopics = monitoredTopics.stream().distinct().collect(Collectors.toList());
-        Preconditions.checkState(
-                !fileStoreTables.isEmpty(),
-                "No tables to be synchronized. Possible cause is the schemas of all tables in specified "
-                        + "Kafka topic's table are not compatible with those of existed Paimon tables. Please check the log.");
-
-        kafkaConfig.set(KafkaConnectorOptions.TOPIC, monitoredTopics);
         KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig);
 
         EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
         String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
         if ("canal-json".equals(format)) {
-            parserFactory = RichCdcMultiplexRecordEventParser::new;
+            RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
+                    new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+            Pattern includingPattern = this.includingPattern;
+            Pattern excludingPattern = this.excludingPattern;
+            parserFactory =
+                    () ->
+                            new RichCdcMultiplexRecordEventParser(
+                                    schemaBuilder, includingPattern, excludingPattern);
         } else {
             throw new UnsupportedOperationException("This format: " + format + " is not support.");
         }
@@ -233,9 +164,9 @@ public class KafkaSyncDatabaseAction extends ActionBase {
                                                 new CanalRecordParser(
                                                         caseSensitive, tableNameConverter)))
                         .withParserFactory(parserFactory)
-                        .withTables(fileStoreTables)
                         .withCatalogLoader(catalogLoader())
-                        .withDatabase(database);
+                        .withDatabase(database)
+                        .withMode(DatabaseSyncMode.COMBINED);
         String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
         if (sinkParallelism != null) {
             sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
@@ -261,78 +192,6 @@ public class KafkaSyncDatabaseAction extends ActionBase {
                         tableSuffix));
     }
 
-    private Map<String, List<KafkaSchema>> getKafkaCanalSchemaMap() throws Exception {
-        Map<String, List<KafkaSchema>> kafkaCanalSchemaMap = new HashMap<>();
-        List<String> topicList = kafkaConfig.get(KafkaConnectorOptions.TOPIC);
-        if (topicList.size() > 1) {
-            topicList.forEach(
-                    topic -> {
-                        try {
-                            KafkaSchema kafkaSchema =
-                                    KafkaSchema.getKafkaSchema(kafkaConfig, topic);
-                            if (shouldMonitorTable(kafkaSchema.tableName())) {
-                                kafkaCanalSchemaMap.put(
-                                        topic, Collections.singletonList(kafkaSchema));
-                            }
-
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
-        } else {
-            List<KafkaSchema> kafkaSchemaList =
-                    KafkaSchema.getListKafkaSchema(
-                            kafkaConfig, topicList.get(0), schemaInitMaxRead);
-            kafkaSchemaList =
-                    kafkaSchemaList.stream()
-                            .filter(kafkaSchema -> shouldMonitorTable(kafkaSchema.tableName()))
-                            .collect(Collectors.toList());
-            kafkaCanalSchemaMap.put(topicList.get(0), kafkaSchemaList);
-        }
-
-        return kafkaCanalSchemaMap;
-    }
-
-    private boolean shouldMonitorTable(String mySqlTableName) {
-        boolean shouldMonitor = true;
-        if (includingPattern != null) {
-            shouldMonitor = includingPattern.matcher(mySqlTableName).matches();
-        }
-        if (excludingPattern != null) {
-            shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches();
-        }
-        LOG.debug("Source table {} is monitored? {}", mySqlTableName, shouldMonitor);
-        return shouldMonitor;
-    }
-
-    private boolean shouldMonitorTable(
-            TableSchema tableSchema, Schema schema, Supplier<String> errMsg) {
-        if (KafkaActionUtils.schemaCompatible(tableSchema, schema)) {
-            return true;
-        } else if (ignoreIncompatible) {
-            LOG.warn(errMsg.get() + "This table will be ignored.");
-            return false;
-        } else {
-            throw new IllegalArgumentException(
-                    errMsg.get()
-                            + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
-        }
-    }
-
-    private Supplier<String> incompatibleMessage(
-            TableSchema paimonSchema, KafkaSchema kafkaSchema, Identifier identifier) {
-        return () ->
-                String.format(
-                        "Incompatible schema found.\n"
-                                + "Paimon table is: %s, fields are: %s.\n"
-                                + "Kafka's table is: %s.%s, fields are: %s.\n",
-                        identifier.getFullName(),
-                        paimonSchema.fields(),
-                        kafkaSchema.databaseName(),
-                        kafkaSchema.tableName(),
-                        kafkaSchema.fields());
-    }
-
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index 3d2d6c96d..3af9599ac 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -42,14 +42,8 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
         checkRequiredArgument(params, "database");
         checkRequiredArgument(params, "kafka-conf");
 
-        int schemaInitMaxRead = 1000;
-        if (params.has("schema-init-max-read")) {
-            schemaInitMaxRead = Integer.parseInt(params.get("schema-init-max-read"));
-        }
-
         String warehouse = params.get("warehouse");
         String database = params.get("database");
-        boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible"));
         String tablePrefix = params.get("table-prefix");
         String tableSuffix = params.get("table-suffix");
         String includingTables = params.get("including-tables");
@@ -63,8 +57,6 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
                         kafkaConfigOption,
                         warehouse,
                         database,
-                        schemaInitMaxRead,
-                        ignoreIncompatible,
                         tablePrefix,
                         tableSuffix,
                         includingTables,
@@ -85,8 +77,6 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
         System.out.println("Syntax:");
         System.out.println(
                 "  kafka-sync-database --warehouse <warehouse-path> --database <database-name> "
-                        + "[--schema-init-max-read <schema-init-max-read>] "
-                        + "[--ignore-incompatible <true/false>] "
                         + "[--table-prefix <paimon-table-prefix>] "
                         + "[--table-suffix <paimon-table-suffix>] "
                         + "[--including-tables <table-name|name-regular-expr>] "
@@ -96,16 +86,6 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
                         + "[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]");
         System.out.println();
 
-        System.out.println(
-                "--schema-init-max-read is default 1000, if your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized.");
-        System.out.println();
-
-        System.out.println(
-                "--ignore-incompatible is default false, in this case, if Topic's table name exists in Paimon "
-                        + "and their schema is incompatible, an exception will be thrown. "
-                        + "You can specify it to true explicitly to ignore the incompatible tables and exception.");
-        System.out.println();
-
         System.out.println(
                 "--table-prefix is the prefix of all Paimon tables to be synchronized. For example, if you want all "
                         + "synchronized tables to have \"ods_\" as prefix, you can specify `--table-prefix ods_`.");
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index d890eeaa7..7ebd1e03f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -156,15 +156,7 @@ public class KafkaSyncTableAction extends ActionBase {
             table = (FileStoreTable) catalog.getTable(identifier);
             KafkaActionUtils.assertSchemaCompatible(table.schema(), fromCanal);
         } catch (Catalog.TableNotExistException e) {
-            Schema schema =
-                    KafkaActionUtils.buildPaimonSchema(
-                            kafkaSchema,
-                            partitionKeys,
-                            primaryKeys,
-                            computedColumns,
-                            paimonConfig,
-                            caseSensitive);
-            catalog.createTable(identifier, schema, false);
+            catalog.createTable(identifier, fromCanal, false);
             table = (FileStoreTable) catalog.getTable(identifier);
         }
         String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
index 6812aaa8a..2b9c68747 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.canal;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -27,6 +28,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -46,6 +48,8 @@ import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyCo
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 
+import javax.annotation.Nullable;
+
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -65,6 +69,7 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
     private static final String FIELD_TABLE = "table";
     private static final String FIELD_SQL = "sql";
     private static final String FIELD_MYSQL_TYPE = "mysqlType";
+    private static final String FIELD_PRIMARY_KEYS = "pkNames";
     private static final String FIELD_TYPE = "type";
     private static final String FIELD_DATA = "data";
     private static final String FIELD_OLD = "old";
@@ -104,12 +109,37 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
         root = objectMapper.readValue(value, JsonNode.class);
         validateFormat();
 
-        databaseName = root.get(FIELD_DATABASE).asText();
-        tableName = tableNameConverter.convert(root.get(FIELD_TABLE).asText());
+        databaseName = extractString(FIELD_DATABASE);
+        tableName = tableNameConverter.convert(extractString(FIELD_TABLE));
 
         extractRecords().forEach(out::collect);
     }
 
+    @Nullable
+    public KafkaSchema getKafkaSchema(String record) {
+        try {
+            root = objectMapper.readValue(record, JsonNode.class);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+        validateFormat();
+
+        if (isDdl()) {
+            return null;
+        }
+
+        LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
+        LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
+        mySqlFieldTypes.forEach(
+                (name, type) -> paimonFieldTypes.put(name, toPaimonDataType(type, true)));
+
+        return new KafkaSchema(
+                extractString(FIELD_DATABASE),
+                extractString(FIELD_TABLE),
+                paimonFieldTypes,
+                extractPrimaryKeys());
+    }
+
     private void validateFormat() {
         String errorMessageTemplate =
                 "Didn't find '%s' node in json. Only supports canal-json format,"
@@ -124,9 +154,14 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
             checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
         } else {
             checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
+            checkNotNull(root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
         }
     }
 
+    private String extractString(String key) {
+        return root.get(key).asText();
+    }
+
     private boolean isDdl() {
         return root.get("isDdl") != null && root.get("isDdl").asBoolean();
     }
@@ -136,6 +171,8 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
             return extractRecordsFromDdl();
         }
 
+        List<String> primaryKeys = extractPrimaryKeys();
+
         // extract field types
         LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
         LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
@@ -144,7 +181,7 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
 
         // extract row kind and field values
         List<RichCdcMultiplexRecord> records = new ArrayList<>();
-        String type = root.get(FIELD_TYPE).asText();
+        String type = extractString(FIELD_TYPE);
         ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
         switch (type) {
             case OP_UPDATE:
@@ -167,18 +204,20 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
                         before = caseSensitive ? before : keyCaseInsensitive(before);
                         records.add(
                                 new RichCdcMultiplexRecord(
-                                        new CdcRecord(RowKind.DELETE, before),
-                                        paimonFieldTypes,
                                         databaseName,
-                                        tableName));
+                                        tableName,
+                                        paimonFieldTypes,
+                                        primaryKeys,
+                                        new CdcRecord(RowKind.DELETE, before)));
                     }
                     after = caseSensitive ? after : keyCaseInsensitive(after);
                     records.add(
                             new RichCdcMultiplexRecord(
-                                    new CdcRecord(RowKind.INSERT, after),
-                                    paimonFieldTypes,
                                     databaseName,
-                                    tableName));
+                                    tableName,
+                                    paimonFieldTypes,
+                                    primaryKeys,
+                                    new CdcRecord(RowKind.INSERT, after)));
                 }
                 break;
             case OP_INSERT:
@@ -190,10 +229,11 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
                     RowKind kind = type.equals(OP_INSERT) ? RowKind.INSERT : RowKind.DELETE;
                     records.add(
                             new RichCdcMultiplexRecord(
-                                    new CdcRecord(kind, after),
-                                    paimonFieldTypes,
                                     databaseName,
-                                    tableName));
+                                    tableName,
+                                    paimonFieldTypes,
+                                    primaryKeys,
+                                    new CdcRecord(kind, after)));
                 }
                 break;
             default:
@@ -204,7 +244,7 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
     }
 
     private List<RichCdcMultiplexRecord> extractRecordsFromDdl() {
-        String sql = root.get(FIELD_SQL).asText();
+        String sql = extractString(FIELD_SQL);
         if (StringUtils.isEmpty(sql)) {
             return Collections.emptyList();
         }
@@ -221,7 +261,11 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
 
         return Collections.singletonList(
                 new RichCdcMultiplexRecord(
-                        CdcRecord.emptyRecord(), fieldTypes, databaseName, tableName));
+                        databaseName,
+                        tableName,
+                        fieldTypes,
+                        Collections.emptyList(),
+                        CdcRecord.emptyRecord()));
     }
 
     private void extractFieldTypesFromAlterTableItem(
@@ -276,6 +320,13 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
         return MySqlTypeUtils.toDataType(mySqlType).copy(isNullable);
     }
 
+    private List<String> extractPrimaryKeys() {
+        List<String> primaryKeys = new ArrayList<>();
+        ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
+        pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
+        return primaryKeys;
+    }
+
     private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
         LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index f8ec80618..71de9e336 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -39,18 +40,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import com.alibaba.druid.sql.SQLUtils;
-import com.alibaba.druid.sql.ast.SQLDataType;
-import com.alibaba.druid.sql.ast.SQLExpr;
-import com.alibaba.druid.sql.ast.SQLName;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
-import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
-import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
-import com.alibaba.druid.sql.ast.statement.SQLTableElement;
-import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import com.alibaba.druid.util.JdbcConstants;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +53,6 @@ import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -81,29 +69,45 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
     private final boolean caseSensitive;
     private final TableNameConverter tableNameConverter;
     private final List<ComputedColumn> computedColumns;
+    private final NewTableSchemaBuilder<String> schemaBuilder;
 
     private JsonNode root;
     private JsonNode payload;
 
     public MySqlDebeziumJsonEventParser(
             ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> computedColumns) {
-        this(serverTimeZone, caseSensitive, computedColumns, new TableNameConverter(caseSensitive));
+        this(
+                serverTimeZone,
+                caseSensitive,
+                computedColumns,
+                new TableNameConverter(caseSensitive),
+                ddl -> Optional.empty());
     }
 
     public MySqlDebeziumJsonEventParser(
-            ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter) {
-        this(serverTimeZone, caseSensitive, Collections.emptyList(), tableNameConverter);
+            ZoneId serverTimeZone,
+            boolean caseSensitive,
+            TableNameConverter tableNameConverter,
+            NewTableSchemaBuilder<String> schemaBuilder) {
+        this(
+                serverTimeZone,
+                caseSensitive,
+                Collections.emptyList(),
+                tableNameConverter,
+                schemaBuilder);
     }
 
     public MySqlDebeziumJsonEventParser(
             ZoneId serverTimeZone,
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
-            TableNameConverter tableNameConverter) {
+            TableNameConverter tableNameConverter,
+            NewTableSchemaBuilder<String> schemaBuilder) {
         this.serverTimeZone = serverTimeZone;
         this.caseSensitive = caseSensitive;
         this.computedColumns = computedColumns;
         this.tableNameConverter = tableNameConverter;
+        this.schemaBuilder = schemaBuilder;
     }
 
     @Override
@@ -178,7 +182,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
     }
 
     @Override
-    public Optional<Schema> parseNewTable(String databaseName) {
+    public Optional<Schema> parseNewTable() {
         JsonNode historyRecord = payload.get("historyRecord");
         if (historyRecord == null) {
             return Optional.empty();
@@ -190,69 +194,13 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
             if (Strings.isNullOrEmpty(ddl)) {
                 return Optional.empty();
             }
-
-            SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL);
-
-            if (!(statement instanceof MySqlCreateTableStatement)) {
-                return Optional.empty();
-            }
-
-            MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement;
-
-            // TODO: add default table config, partitions, and computed column
-            //     for newly added table;
-            MySqlSchema mySqlSchema = buildMySqlSchema(databaseName, createTableStatement);
-            Schema fromMySql =
-                    MySqlActionUtils.buildPaimonSchema(
-                            mySqlSchema,
-                            Collections.emptyList(),
-                            mySqlSchema.getPrimaryKeys(),
-                            computedColumns,
-                            Collections.emptyMap(),
-                            caseSensitive);
-
-            return Optional.of(fromMySql);
-
+            return schemaBuilder.build(ddl);
         } catch (Exception e) {
             LOG.info("Failed to parse history record for schema changes", e);
             return Optional.empty();
         }
     }
 
-    private MySqlSchema buildMySqlSchema(String database, MySqlCreateTableStatement statement) {
-        LinkedHashMap<String, Tuple2<DataType, String>> fields = new LinkedHashMap<>();
-
-        List<SQLTableElement> columns = statement.getTableElementList();
-        for (SQLTableElement element : columns) {
-            if (element instanceof SQLColumnDefinition) {
-                SQLColumnDefinition column = (SQLColumnDefinition) element;
-                SQLName name = column.getName();
-                SQLDataType dataType = column.getDataType();
-                List<SQLExpr> arguments = dataType.getArguments();
-                Integer precision = null;
-                Integer scale = null;
-                if (arguments.size() >= 1) {
-                    precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue());
-                }
-
-                if (arguments.size() >= 2) {
-                    scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue());
-                }
-
-                SQLCharExpr comment = (SQLCharExpr) column.getComment();
-                fields.put(
-                        name.getSimpleName(),
-                        Tuple2.of(
-                                MySqlTypeUtils.toDataType(
-                                        column.getDataType().getName(), precision, scale),
-                                comment == null ? null : String.valueOf(comment.getValue())));
-            }
-        }
-
-        return new MySqlSchema(
-                database, statement.getTableName(), fields, statement.getPrimaryKeyNames());
-    }
-
     @Override
     public List<CdcRecord> parseRecords() {
         if (isSchemaChange()) {
@@ -378,7 +326,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
             } else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
                 // MySQL timestamp
 
-                // dispaly value of timestamp is affected by timezone, see
+                // display value of timestamp is affected by timezone, see
                 // https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
                 // RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
                 // for implementation
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 5501e25bc..55ffeea1b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -40,17 +40,6 @@ public class MySqlSchema {
     private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
     private final List<String> primaryKeys;
 
-    public MySqlSchema(
-            String databaseName,
-            String tableName,
-            LinkedHashMap<String, Tuple2<DataType, String>> fields,
-            List<String> primaryKeys) {
-        this.databaseName = databaseName;
-        this.tableName = tableName;
-        this.fields = fields;
-        this.primaryKeys = primaryKeys;
-    }
-
     public MySqlSchema(DatabaseMetaData metaData, String databaseName, String tableName)
             throws Exception {
         this.databaseName = databaseName;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 4bf8e799b..74df7cbac 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
@@ -54,8 +55,8 @@ import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -107,7 +108,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
     @Nullable private final Pattern excludingPattern;
     private final Map<String, String> tableConfig;
     private final String includingTables;
-    private final MySqlDatabaseSyncMode mode;
+    private final DatabaseSyncMode mode;
 
     public MySqlSyncDatabaseAction(
             Map<String, String> mySqlConfig,
@@ -127,7 +128,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                 null,
                 catalogConfig,
                 tableConfig,
-                SEPARATE);
+                DIVIDED);
     }
 
     public MySqlSyncDatabaseAction(
@@ -141,7 +142,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
             @Nullable String excludingTables,
             Map<String, String> catalogConfig,
             Map<String, String> tableConfig,
-            MySqlDatabaseSyncMode mode) {
+            DatabaseSyncMode mode) {
         super(warehouse, catalogConfig);
         this.mySqlConfig = Configuration.fromMap(mySqlConfig);
         this.database = database;
@@ -217,10 +218,10 @@ public class MySqlSyncDatabaseAction extends ActionBase {
                         + "MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
         String tableList;
 
-        if (mode == UNIFIED) {
-            // First excluding all tables that failed the excludingPattern and those does not
-            //     have a primary key. Then including other table using regex so that newly
-            //     added table DDLs and DMLs during job runtime will be captured
+        if (mode == COMBINED) {
+            // First excluding all tables that failed the excludingPattern and don't have primary
+            // keys. Then including other tables using regex so that newly added table DDLs and DMLs
+            // during job runtime can be captured
             tableList =
                     excludedTables.stream()
                                     .map(t -> String.format("(?!(%s))", t))
@@ -234,11 +235,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
 
         String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
         ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone);
+        MySqlTableSchemaBuilder schemaBuilder =
+                new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
         EventParser.Factory<String> parserFactory =
-                () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, tableNameConverter);
+                () ->
+                        new MySqlDebeziumJsonEventParser(
+                                zoneId, caseSensitive, tableNameConverter, schemaBuilder);
 
         String database = this.database;
-        MySqlDatabaseSyncMode mode = this.mode;
+        DatabaseSyncMode mode = this.mode;
         FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
                 new FlinkCdcSyncDatabaseSinkBuilder<String>()
                         .withInput(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index 1ccc0c713..0a6723bf7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -20,14 +20,15 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
 
 /** Factory to create {@link MySqlSyncDatabaseAction}. */
 public class MySqlSyncDatabaseActionFactory implements ActionFactory {
@@ -53,11 +54,21 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
         String includingTables = params.get("including-tables");
         String excludingTables = params.get("excluding-tables");
         String mode = params.get("mode");
-        MySqlDatabaseSyncMode syncMode;
-        if ("unified".equalsIgnoreCase(mode)) {
-            syncMode = UNIFIED;
+        DatabaseSyncMode syncMode;
+        if (mode == null) {
+            syncMode = DIVIDED;
         } else {
-            syncMode = SEPARATE;
+            switch (mode.toLowerCase()) {
+                case "divided":
+                    syncMode = DIVIDED;
+                    break;
+                case "combined":
+                    syncMode = COMBINED;
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported mode '" + mode + "' for database synchronization mode.");
+            }
         }
 
         Map<String, String> mySqlConfig = optionalConfigMap(params, "mysql-conf");
@@ -96,6 +107,7 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
                         + "[--table-suffix <paimon-table-suffix>] "
                         + "[--including-tables <mysql-table-name|name-regular-expr>] "
                         + "[--excluding-tables <mysql-table-name|name-regular-expr>] "
+                        + "[--mode <sync-mode>] "
                         + "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] "
                         + "[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] "
                         + "[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]");
@@ -123,6 +135,15 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
                 "--excluding-tables has higher priority than --including-tables if you specified both.");
         System.out.println();
 
+        System.out.println(
+                "--mode is used to specify synchronization mode. You can specify two modes:");
+        System.out.println(
+                "  1. 'divided' (the default mode if you haven't specified one): "
+                        + "start a sink for each table, the synchronization of the new table requires restarting the job;");
+        System.out.println(
+                "  2. 'combined': start a single combined sink for all tables, the new table will be automatically synchronized.");
+        System.out.println();
+
         System.out.println("MySQL CDC source conf syntax:");
         System.out.println("  key=value");
         System.out.println(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
new file mode 100644
index 000000000..fb6a47cbc
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLDataType;
+import com.alibaba.druid.sql.ast.SQLExpr;
+import com.alibaba.druid.sql.ast.SQLName;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
+import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
+import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
+import com.alibaba.druid.sql.ast.statement.SQLTableElement;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
+import com.alibaba.druid.util.JdbcConstants;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Schema builder for MySQL cdc. */
+public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<String> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySqlTableSchemaBuilder.class);
+
+    private final Map<String, String> tableConfig;
+    private final boolean caseSensitive;
+
+    public MySqlTableSchemaBuilder(Map<String, String> tableConfig, boolean caseSensitive) {
+        this.tableConfig = tableConfig;
+        this.caseSensitive = caseSensitive;
+    }
+
+    @Override
+    public Optional<Schema> build(String ddl) {
+        SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL);
+
+        if (!(statement instanceof MySqlCreateTableStatement)) {
+            return Optional.empty();
+        }
+
+        MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement;
+        String tableName = createTableStatement.getTableName();
+
+        List<String> primaryKeys = createTableStatement.getPrimaryKeyNames();
+        if (primaryKeys.isEmpty()) {
+            LOG.debug(
+                    "Didn't find primary keys from MySQL DDL for table '{}'. "
+                            + "This table won't be synchronized.",
+                    tableName);
+            // TODO: for non-pk tables, we should not handle their records because we haven't
+            // created them here. Fix in 'MySqlDebeziumJsonEventParser'
+            return Optional.empty();
+        }
+
+        List<SQLTableElement> columns = createTableStatement.getTableElementList();
+        LinkedHashMap<String, Tuple2<DataType, String>> fields = new LinkedHashMap<>();
+
+        for (SQLTableElement element : columns) {
+            if (element instanceof SQLColumnDefinition) {
+                SQLColumnDefinition column = (SQLColumnDefinition) element;
+                SQLName name = column.getName();
+                SQLDataType dataType = column.getDataType();
+                List<SQLExpr> arguments = dataType.getArguments();
+                Integer precision = null;
+                Integer scale = null;
+                if (arguments.size() >= 1) {
+                    precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue());
+                }
+
+                if (arguments.size() >= 2) {
+                    scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue());
+                }
+
+                SQLCharExpr comment = (SQLCharExpr) column.getComment();
+                fields.put(
+                        name.getSimpleName(),
+                        Tuple2.of(
+                                MySqlTypeUtils.toDataType(
+                                        column.getDataType().getName(), precision, scale),
+                                comment == null ? null : String.valueOf(comment.getValue())));
+            }
+        }
+
+        if (!caseSensitive) {
+            LinkedHashMap<String, Tuple2<DataType, String>> tmp = new LinkedHashMap<>();
+            for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
+                String fieldName = entry.getKey();
+                checkArgument(
+                        !tmp.containsKey(fieldName.toLowerCase()),
+                        "Duplicate key '%s' in table '%s' appears when converting fields map keys to case-insensitive form.",
+                        fieldName,
+                        tableName);
+                tmp.put(fieldName.toLowerCase(), entry.getValue());
+            }
+            fields = tmp;
+
+            primaryKeys =
+                    primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
+        }
+
+        Schema.Builder builder = Schema.newBuilder();
+        builder.options(tableConfig);
+        for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
+            builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1);
+        }
+
+        Schema schema = builder.primaryKey(primaryKeys).build();
+
+        return Optional.of(schema);
+    }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 556be5762..4f2e4a3b9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -87,7 +87,7 @@ public class CdcDynamicTableParsingProcessFunction<T> extends ProcessFunction<T,
         String tableName = parser.parseTableName();
 
         // check for newly added table
-        parser.parseNewTable(database)
+        parser.parseNewTable()
                 .ifPresent(
                         schema -> {
                             Identifier identifier =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index e62864ee9..c0e481012 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -59,10 +59,9 @@ public interface EventParser<T> {
     /**
      * Parse newly added table schema from event.
      *
-     * @param databaseName database of the new table
      * @return empty if there is no newly added table
      */
-    default Optional<Schema> parseNewTable(String databaseName) {
+    default Optional<Schema> parseNewTable() {
         return Optional.empty();
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 6ba1278a1..0070f3ae9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
@@ -37,7 +37,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 
 /**
@@ -68,7 +68,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     private Catalog.Loader catalogLoader;
     // database to sync, currently only support single database
     private String database;
-    private MySqlDatabaseSyncMode mode;
+    private DatabaseSyncMode mode;
 
     public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
         this.input = input;
@@ -101,7 +101,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         return this;
     }
 
-    public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MySqlDatabaseSyncMode mode) {
+    public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(DatabaseSyncMode mode) {
         this.mode = mode;
         return this;
     }
@@ -112,14 +112,14 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         Preconditions.checkNotNull(database);
         Preconditions.checkNotNull(catalogLoader);
 
-        if (mode == UNIFIED) {
-            buildUnifiedCdcSink();
+        if (mode == COMBINED) {
+            buildCombinedCdcSink();
         } else {
-            buildStaticCdcSink();
+            buildDividedCdcSink();
         }
     }
 
-    private void buildUnifiedCdcSink() {
+    private void buildCombinedCdcSink() {
         SingleOutputStreamOperator<Void> parsed =
                 input.forward()
                         .process(
@@ -162,7 +162,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         new FlinkCdcSink(table).sinkFrom(partitioned);
     }
 
-    private void buildStaticCdcSink() {
+    private void buildDividedCdcSink() {
         Preconditions.checkNotNull(tables);
 
         SingleOutputStreamOperator<Void> parsed =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
similarity index 67%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index a7eccba66..0ceb820ff 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -16,19 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.schema.Schema;
 
 import java.io.Serializable;
+import java.util.Optional;
 
-/**
- * There are two modes for database sync.
- *
- * <p>1) SEPARATE mode, start a sink for each table, the synchronization of the new table requires
- * restarting the job.
- *
- * <p>2) UNIFIED mode, start a unified sink, the new table will be automatically synchronized.
- */
-public enum MySqlDatabaseSyncMode implements Serializable {
-    SEPARATE,
-    UNIFIED
+/** Build table schema for newly added table in CDC ingestion. */
+@FunctionalInterface
+public interface NewTableSchemaBuilder<T> extends Serializable {
+
+    Optional<Schema> build(T source);
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
index c8d09f4d0..0b1fa30aa 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -22,6 +22,7 @@ import org.apache.paimon.types.DataType;
 
 import java.io.Serializable;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Objects;
 
 /** Compared to {@link CdcMultiplexRecord}, this contains schema information. */
@@ -29,33 +30,44 @@ public class RichCdcMultiplexRecord implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private final CdcRecord cdcRecord;
-    private final LinkedHashMap<String, DataType> fieldTypes;
     private final String databaseName;
     private final String tableName;
+    private final LinkedHashMap<String, DataType> fieldTypes;
+    private final List<String> primaryKeys;
+    private final CdcRecord cdcRecord;
 
     public RichCdcMultiplexRecord(
-            CdcRecord cdcRecord,
-            LinkedHashMap<String, DataType> fieldTypes,
             String databaseName,
-            String tableName) {
-        this.cdcRecord = cdcRecord;
-        this.fieldTypes = fieldTypes;
+            String tableName,
+            LinkedHashMap<String, DataType> fieldTypes,
+            List<String> primaryKeys,
+            CdcRecord cdcRecord) {
         this.databaseName = databaseName;
         this.tableName = tableName;
+        this.fieldTypes = fieldTypes;
+        this.primaryKeys = primaryKeys;
+        this.cdcRecord = cdcRecord;
     }
 
     public String tableName() {
         return tableName;
     }
 
+    public LinkedHashMap<String, DataType> fieldTypes() {
+        return fieldTypes;
+    }
+
+    public List<String> primaryKeys() {
+        return primaryKeys;
+    }
+
     public RichCdcRecord toRichCdcRecord() {
         return new RichCdcRecord(cdcRecord, fieldTypes);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(cdcRecord, fieldTypes, databaseName, tableName);
+        return Objects.hash(databaseName, tableName, fieldTypes, primaryKeys, cdcRecord);
     }
 
     @Override
@@ -67,23 +79,26 @@ public class RichCdcMultiplexRecord implements Serializable {
             return false;
         }
         RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
-        return Objects.equals(cdcRecord, that.cdcRecord)
+        return databaseName.equals(that.databaseName)
+                && tableName.equals(that.tableName)
                 && Objects.equals(fieldTypes, that.fieldTypes)
-                && databaseName.equals(that.databaseName)
-                && tableName.equals(that.tableName);
+                && Objects.equals(primaryKeys, that.primaryKeys)
+                && Objects.equals(cdcRecord, that.cdcRecord);
     }
 
     @Override
     public String toString() {
         return "{"
-                + "cdcRecord="
-                + cdcRecord
-                + ", fieldTypes="
-                + fieldTypes
-                + ", databaseName="
+                + "databaseName="
                 + databaseName
                 + ", tableName="
                 + tableName
+                + ", fieldTypes="
+                + fieldTypes
+                + ", primaryKeys="
+                + primaryKeys
+                + ", cdcRecord="
+                + cdcRecord
                 + '}';
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 105f45be4..326dad0d4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -18,26 +18,64 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
 
 /** {@link EventParser} for {@link RichCdcMultiplexRecord}. */
 public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMultiplexRecord> {
 
-    // TODO: currently we don't consider database
+    private static final Logger LOG =
+            LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);
+
+    private final NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder;
+    @Nullable private final Pattern includingPattern;
+    @Nullable private final Pattern excludingPattern;
+    private final Map<String, RichEventParser> parsers = new HashMap<>();
+    private final Set<String> includedTables = new HashSet<>();
+    private final Set<String> excludedTables = new HashSet<>();
+    private final Set<String> createdTables = new HashSet<>();
+
+    private RichCdcMultiplexRecord record;
     private String currentTable;
+    private boolean shouldSynchronizeCurrentTable;
     private RichEventParser currentParser;
 
-    private final Map<String, RichEventParser> parsers = new HashMap<>();
+    public RichCdcMultiplexRecordEventParser() {
+        this(record -> Optional.empty(), null, null);
+    }
+
+    public RichCdcMultiplexRecordEventParser(
+            NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder,
+            @Nullable Pattern includingPattern,
+            @Nullable Pattern excludingPattern) {
+        this.schemaBuilder = schemaBuilder;
+        this.includingPattern = includingPattern;
+        this.excludingPattern = excludingPattern;
+    }
 
     @Override
     public void setRawEvent(RichCdcMultiplexRecord record) {
+        this.record = record;
         this.currentTable = record.tableName();
-        this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
-        currentParser.setRawEvent(record.toRichCdcRecord());
+        this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(record.primaryKeys());
+        if (shouldSynchronizeCurrentTable) {
+            this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
+            this.currentParser.setRawEvent(record.toRichCdcRecord());
+        }
     }
 
     @Override
@@ -47,11 +85,65 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
 
     @Override
     public List<DataField> parseSchemaChange() {
-        return currentParser.parseSchemaChange();
+        return shouldSynchronizeCurrentTable
+                ? currentParser.parseSchemaChange()
+                : Collections.emptyList();
     }
 
     @Override
     public List<CdcRecord> parseRecords() {
-        return currentParser.parseRecords();
+        return shouldSynchronizeCurrentTable
+                ? currentParser.parseRecords()
+                : Collections.emptyList();
+    }
+
+    @Override
+    public Optional<Schema> parseNewTable() {
+        if (shouldCreateCurrentTable()) {
+            return schemaBuilder.build(record);
+        }
+
+        return Optional.empty();
+    }
+
+    private boolean shouldSynchronizeCurrentTable(List<String> primaryKeys) {
+        if (includedTables.contains(currentTable)) {
+            return true;
+        }
+        if (excludedTables.contains(currentTable)) {
+            return false;
+        }
+
+        boolean shouldSynchroniz = true;
+        if (includingPattern != null) {
+            shouldSynchroniz = includingPattern.matcher(currentTable).matches();
+        }
+        if (excludingPattern != null) {
+            shouldSynchroniz =
+                    shouldSynchroniz && !excludingPattern.matcher(currentTable).matches();
+        }
+        if (!shouldSynchroniz) {
+            LOG.debug(
+                    "Source table {} won't be synchronized because it was excluded. ",
+                    currentTable);
+            excludedTables.add(currentTable);
+            return false;
+        }
+
+        if (primaryKeys.isEmpty()) {
+            LOG.debug(
+                    "Didn't find primary keys from kafka topic's table schemas for table '{}'. "
+                            + "This table won't be synchronized.",
+                    currentTable);
+            excludedTables.add(currentTable);
+            return false;
+        }
+
+        includedTables.add(currentTable);
+        return true;
+    }
+
+    private boolean shouldCreateCurrentTable() {
+        return shouldSynchronizeCurrentTable && createdTables.add(currentTable);
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
similarity index 52%
copy from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
index e39a74d5c..9b83b6d7e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
@@ -19,40 +19,32 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.ObjectUtils;
+import org.apache.paimon.types.DataType;
 
-import java.util.Collections;
-import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
-/** Testing {@link EventParser} for {@link TestCdcEvent}. */
-public class TestCdcEventParser implements EventParser<TestCdcEvent> {
+/** Schema builder for {@link RichCdcMultiplexRecord}. */
+public class RichCdcMultiplexRecordSchemaBuilder
+        implements NewTableSchemaBuilder<RichCdcMultiplexRecord> {
 
-    private TestCdcEvent raw;
+    private final Map<String, String> tableConfig;
 
-    @Override
-    public void setRawEvent(TestCdcEvent raw) {
-        this.raw = raw;
+    public RichCdcMultiplexRecordSchemaBuilder(Map<String, String> tableConfig) {
+        this.tableConfig = tableConfig;
     }
 
     @Override
-    public String parseTableName() {
-        return raw.tableName();
-    }
+    public Optional<Schema> build(RichCdcMultiplexRecord record) {
+        Schema.Builder builder = Schema.newBuilder();
+        builder.options(tableConfig);
 
-    @Override
-    public List<DataField> parseSchemaChange() {
-        return ObjectUtils.coalesce(raw.updatedDataFields(), Collections.emptyList());
-    }
+        for (Map.Entry<String, DataType> entry : record.fieldTypes().entrySet()) {
+            builder.column(entry.getKey(), entry.getValue(), null);
+        }
 
-    @Override
-    public List<CdcRecord> parseRecords() {
-        return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
-    }
+        Schema schema = builder.primaryKey(record.primaryKeys()).build();
 
-    @Override
-    public Optional<Schema> parseNewTable(String databaseName) {
-        return Optional.empty();
+        return Optional.of(schema);
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 580240773..2582cd5c0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -18,7 +18,12 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableScan;
@@ -363,4 +368,13 @@ public abstract class KafkaActionITCaseBase extends ActionITCaseBase {
             Thread.sleep(1000);
         }
     }
+
+    protected Catalog catalog() {
+        return CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+    }
+
+    protected FileStoreTable getFileStoreTable(String tableName) throws Exception {
+        Identifier identifier = Identifier.create(database, tableName);
+        return (FileStoreTable) catalog().getTable(identifier);
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 5add09619..8a06b2df8 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.core.execution.JobClient;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Timeout;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -96,12 +98,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
         tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1));
         KafkaSyncDatabaseAction action =
                 new KafkaSyncDatabaseAction(
-                        kafkaConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        tableConfig);
+                        kafkaConfig, warehouse, database, Collections.emptyMap(), tableConfig);
         action.build(env);
         JobClient client = env.executeAsync();
         waitJobRunning(client);
@@ -152,12 +149,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
         tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1));
         KafkaSyncDatabaseAction action =
                 new KafkaSyncDatabaseAction(
-                        kafkaConfig,
-                        warehouse,
-                        database,
-                        false,
-                        Collections.emptyMap(),
-                        tableConfig);
+                        kafkaConfig, warehouse, database, Collections.emptyMap(), tableConfig);
         action.build(env);
         JobClient client = env.executeAsync();
         waitJobRunning(client);
@@ -167,6 +159,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
 
     private void testSchemaEvolutionImpl(List<String> topics, boolean writeOne, int fileCount)
             throws Exception {
+        waitTablesCreated("t1", "t2");
+
         FileStoreTable table1 = getFileStoreTable("t1");
         FileStoreTable table2 = getFileStoreTable("t2");
 
@@ -296,7 +290,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
                         kafkaConfig,
                         warehouse,
                         database,
-                        false,
                         Collections.emptyMap(),
                         Collections.emptyMap());
 
@@ -367,8 +360,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
                         kafkaConfig,
                         warehouse,
                         database,
-                        0,
-                        false,
                         "test_prefix_",
                         "_test_suffix",
                         null,
@@ -440,8 +431,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
                         kafkaConfig,
                         warehouse,
                         database,
-                        0,
-                        false,
                         "test_prefix_",
                         "_test_suffix",
                         null,
@@ -457,6 +446,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
 
     private void testTableAffixImpl(List<String> topics, boolean writeOne, int fileCount)
             throws Exception {
+        waitTablesCreated("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix");
+
         FileStoreTable table1 = getFileStoreTable("test_prefix_t1_test_suffix");
         FileStoreTable table2 = getFileStoreTable("test_prefix_t2_test_suffix");
 
@@ -631,8 +622,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
                         kafkaConfig,
                         warehouse,
                         database,
-                        0,
-                        false,
                         null,
                         null,
                         includingTables,
@@ -644,29 +633,30 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
         waitJobRunning(client);
 
         // check paimon tables
-        assertTableExists(existedTables);
+        waitTablesCreated(existedTables.toArray(new String[0]));
         assertTableNotExists(notExistedTables);
     }
 
-    private FileStoreTable getFileStoreTable(String tableName) throws Exception {
-        Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
-        Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog.getTable(identifier);
-    }
-
-    private void assertTableExists(List<String> tableNames) {
-        Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
-        for (String tableName : tableNames) {
-            Identifier identifier = Identifier.create(database, tableName);
-            assertThat(catalog.tableExists(identifier)).isTrue();
-        }
-    }
-
     private void assertTableNotExists(List<String> tableNames) {
-        Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+        Catalog catalog = catalog();
         for (String tableName : tableNames) {
             Identifier identifier = Identifier.create(database, tableName);
             assertThat(catalog.tableExists(identifier)).isFalse();
         }
     }
+
+    private void waitTablesCreated(String... tables) throws Exception {
+        CommonTestUtils.waitUtil(
+                () -> {
+                    try {
+                        List<String> existed = catalog().listTables(database);
+                        return existed.containsAll(Arrays.asList(tables));
+                    } catch (Catalog.DatabaseNotExistException e) {
+                        throw new RuntimeException(e);
+                    }
+                },
+                Duration.ofSeconds(5),
+                Duration.ofMillis(100),
+                "Failed to wait tables to be created in 5 seconds.");
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 1472021b0..bfbd89fe6 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -98,7 +98,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -110,7 +110,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
     }
 
     private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -271,7 +271,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.emptyList(),
                         Collections.singletonList("_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         Collections.emptyMap());
         action.build(env);
@@ -283,7 +283,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
     }
 
     private void testSchemaEvolutionMultipleImpl(String topic) throws Exception {
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -533,7 +533,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                             "_geometrycollection",
                             "_set",
                         });
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
         List<String> expected =
                 Arrays.asList(
                         "+I["
@@ -637,8 +637,6 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
             waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
             SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
             schemaManager.commitChanges(SchemaChange.dropColumn("v"));
-            List<DataField> fields = table.schema().fields();
-            int size = fields.size();
         }
     }
 
@@ -675,7 +673,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         UnsupportedOperationException e =
@@ -719,7 +717,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         Exception e = assertThrows(Exception.class, () -> action.build(env), "Expecting Exception");
@@ -771,7 +769,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         IllegalArgumentException e =
@@ -821,7 +819,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -829,7 +827,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
 
         waitJobRunning(client);
 
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -879,7 +877,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -892,7 +890,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -944,7 +942,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -957,7 +955,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -1007,7 +1005,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -1020,7 +1018,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -1072,7 +1070,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         tableName,
                         Collections.singletonList("pt"),
                         Arrays.asList("pt", "_id"),
-                        Collections.EMPTY_LIST,
+                        Collections.emptyList(),
                         Collections.emptyMap(),
                         tableConfig);
         action.build(env);
@@ -1085,7 +1083,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
         } catch (Exception e) {
             throw new Exception("Failed to write canal data to Kafka.", e);
         }
-        FileStoreTable table = getFileStoreTable();
+        FileStoreTable table = getFileStoreTable(tableName);
 
         RowType rowType =
                 RowType.of(
@@ -1102,10 +1100,4 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
                         "+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", "+I[1, 4, four]");
         waitForResult(expected, table, rowType, primaryKeys);
     }
-
-    private FileStoreTable getFileStoreTable() throws Exception {
-        Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
-        Identifier identifier = Identifier.create(database, tableName);
-        return (FileStoreTable) catalog.getTable(identifier);
-    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 0e3143a0a..07aaebfcc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -59,8 +59,8 @@ import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -374,7 +374,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
                         null,
                         Collections.emptyMap(),
                         tableConfig,
-                        SEPARATE);
+                        DIVIDED);
         action.build(env);
         JobClient client = env.executeAsync();
         waitJobRunning(client);
@@ -549,7 +549,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
                         excludingTables,
                         Collections.emptyMap(),
                         tableConfig,
-                        SEPARATE);
+                        DIVIDED);
         action.build(env);
         JobClient client = env.executeAsync();
         waitJobRunning(client);
@@ -924,7 +924,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
                         null,
                         catalogConfig,
                         tableConfig,
-                        UNIFIED);
+                        COMBINED);
         action.build(env);
 
         if (Objects.nonNull(savepointPath)) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index e39a74d5c..204f8536d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.ObjectUtils;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Testing {@link EventParser} for {@link TestCdcEvent}. */
 public class TestCdcEventParser implements EventParser<TestCdcEvent> {
@@ -50,9 +48,4 @@ public class TestCdcEventParser implements EventParser<TestCdcEvent> {
     public List<CdcRecord> parseRecords() {
         return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
     }
-
-    @Override
-    public Optional<Schema> parseNewTable(String databaseName) {
-        return Optional.empty();
-    }
 }